diff --git core/src/main/scala/kafka/api/DelayedProduce.scala core/src/main/scala/kafka/api/DelayedProduce.scala
new file mode 100644
index 0000000..663cf18
--- /dev/null
+++ core/src/main/scala/kafka/api/DelayedProduce.scala
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.server.{DelayedRequest, KafkaApis}
+import kafka.utils.{SystemTime, Logging}
+import kafka.common.ErrorMapping
+
+
+class DelayedProduce(keys: Seq[Any],
+                      request: RequestChannel.Request,
+                      localErrors: Array[Short],
+                      localOffsets: Array[Long],
+                      val produce: ProducerRequest,
+                      delayMs: Long,
+                      kafkaApis: KafkaApis)
+        extends DelayedRequest(keys, request, delayMs) with Logging {
+
+  /**
+   * Map of (topic, partition) -> partition status
+   * The values in this map don't need to be synchronized since updates to the
+   * values are effectively synchronized by the ProducerRequestPurgatory's
+   * update method
+   */
+  private val partitionStatus = keys.map(key => {
+    val keyIndex = keys.indexOf(key)
+    // if there was an error in writing to the local replica's log, then don't
+    // wait for acks on this partition
+    val acksPending =
+      if (localErrors(keyIndex) == ErrorMapping.NoError) {
+        // Timeout error state will be cleared when requiredAcks are received
+        localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode
+        true
+      }
+      else
+        false
+
+    (key, new PartitionStatus(acksPending, localErrors(keyIndex), localOffsets(keyIndex)))
+  }).toMap
+
+  def respond() {
+    val errorsAndOffsets: Tuple2[List[Short], List[Long]] = (
+            keys.foldRight
+                    ((List[Short](), List[Long]()))
+                    ((key: Any, result: Tuple2[List[Short], List[Long]]) => {
+                      val status = partitionStatus(key)
+                      (status.error :: result._1, status.localOffset :: result._2)
+                    })
+            )
+    val response = new ProducerResponse(produce.versionId, produce.correlationId,
+                                        errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+
+    kafkaApis.requestChannel.sendResponse(new RequestChannel.Response(
+      request, new BoundedByteBufferSend(response), -1))
+  }
+
+  /**
+   * Returns true if this delayed produce request is satisfied (or more
+   * accurately, unblocked) -- this is the case if for every partition:
+   * Case A: This broker is not the leader: unblock - should return error.
+   * Case B: This broker is the leader:
+   *   B.1 - If there was a localError (when writing to the local log): unblock - should return error
+   *   B.2 - else, at least requiredAcks replicas should be caught up to this request.
+   *
+   * As partitions become acknowledged, we may be able to unblock
+   * DelayedFetchRequests that are pending on those partitions.
+   */
+  def isSatisfied(followerFetchPartition: Tuple2[String, Int],
+                  delayedProduce: DelayedProduce) = {
+    val (topic, partitionId) = followerFetchPartition
+    val partitionStatus = delayedProduce.partitionStatus(followerFetchPartition)
+    if (partitionStatus.acksPending) {
+      val leaderReplica =
+        kafkaApis.replicaManager.getLeaderReplica(topic, partitionId)
+      leaderReplica match {
+        case Some(leader) => {
+          if (leader.isLocal) {
+            val isr = leader.partition.inSyncReplicas
+            val numAcks = isr.count(r => {
+              if (!r.isLocal)
+                r.logEndOffset() >= delayedProduce.partitionStatus(followerFetchPartition).localOffset
+              else
+                true /* also count the local (leader) replica */
+            })
+            trace("Received %d/%d acks for produce request to %s-%d".format(
+              numAcks, delayedProduce.produce.requiredAcks,
+              topic, partitionId))
+            if (numAcks >= delayedProduce.produce.requiredAcks) {
+              partitionStatus.acksPending = false
+              partitionStatus.error = ErrorMapping.NoError
+              val topicData =
+                delayedProduce.produce.data.find(_.topic == topic).get
+              val partitionData =
+                topicData.partitionDatas.find(_.partition == partitionId).get
+              kafkaApis.maybeUnblockDelayedFetchRequests(
+                topic, Array(partitionData))
+            }
+          }
+          else {
+            debug("Broker not leader for %s-%d".format(topic, partitionId))
+            partitionStatus.setThisBrokerNotLeader()
+          }
+        }
+        case None =>
+          debug("Broker not leader for %s-%d".format(topic, partitionId))
+          partitionStatus.setThisBrokerNotLeader()
+      }
+    }
+
+    // delayedProduce unblocked if there are no partitions with pending acks
+    val res = ! delayedProduce.partitionStatus.exists(p => p._2.acksPending)
+    if (res) {
+      val elapsed = SystemTime.milliseconds - delayedProduce.sTime
+      debug("DelayedProduce to %s-%d satisfied in %d ms.".format(
+        topic, partitionId, elapsed))
+    }
+    res
+  }
+
+  class PartitionStatus(var acksPending: Boolean,
+                        var error: Short,
+                        val localOffset: Long) {
+    def setThisBrokerNotLeader() {
+      error = ErrorMapping.NotLeaderForPartitionCode
+      acksPending = false
+    }
+  }
+}
+
diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala
index 9ec5c33..c6e33f4 100644
--- core/src/main/scala/kafka/api/FetchResponse.scala
+++ core/src/main/scala/kafka/api/FetchResponse.scala
@@ -103,15 +103,15 @@ object TopicData {
   }
 }
 
-case class TopicData(topic: String, partitionData: Array[PartitionData]) {
-  val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + _.sizeInBytes)
+case class TopicData(topic: String, partitionDatas: Array[PartitionData]) {
+  val sizeInBytes = 2 + topic.length + partitionDatas.foldLeft(4)(_ + _.sizeInBytes)
 
   // need to override equals due to brokern java-arrays equals functionality
   override def equals(other: Any): Boolean = {
     other match {
       case that: TopicData =>
         ( topic == that.topic &&
-          partitionData.toSeq == that.partitionData.toSeq )
+          partitionDatas.toSeq == that.partitionDatas.toSeq )
       case _ => false
     }
   }
@@ -124,11 +124,11 @@ class TopicDataSend(val topicData: TopicData) extends Send {
 
   private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
   Utils.writeShortString(buffer, topicData.topic, "UTF-8")
-  buffer.putInt(topicData.partitionData.length)
+  buffer.putInt(topicData.partitionDatas.length)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionData.map(new PartitionDataSend(_)).toList) {
-    val expectedBytesToWrite = topicData.partitionData.foldLeft(0)(_ + _.sizeInBytes)
+  val sends = new MultiSend(topicData.partitionDatas.map(new PartitionDataSend(_)).toList) {
+    val expectedBytesToWrite = topicData.partitionDatas.foldLeft(0)(_ + _.sizeInBytes)
   }
 
   def complete = sent >= size
@@ -175,7 +175,7 @@ case class FetchResponse(versionId: Short,
   def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
     val messageSet = topicMap.get(topic) match {
       case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
+        TopicData.findPartition(topicData.partitionDatas, partition).map(_.messages).getOrElse(MessageSet.Empty)
       case None =>
         MessageSet.Empty
     }
@@ -185,7 +185,7 @@ case class FetchResponse(versionId: Short,
   def highWatermark(topic: String, partition: Int): Long = {
     topicMap.get(topic) match {
       case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
+        TopicData.findPartition(topicData.partitionDatas, partition).map(_.hw).getOrElse(-1L)
       case None => -1L
     }
   }
diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala
index a4a7184..0b8d068 100644
--- core/src/main/scala/kafka/api/ProducerRequest.scala
+++ core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -21,6 +21,7 @@ import java.nio._
 import kafka.message._
 import kafka.utils._
 
+
 object ProducerRequest {
   val CurrentVersion: Short = 0
 
@@ -56,7 +57,7 @@ case class ProducerRequest( versionId: Short,
                             correlationId: Int,
                             clientId: String,
                             requiredAcks: Short,
-                            ackTimeout: Int,
+                            ackTimeoutSecs: Int,
                             data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
 
   def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) =
@@ -67,13 +68,13 @@ case class ProducerRequest( versionId: Short,
     buffer.putInt(correlationId)
     Utils.writeShortString(buffer, clientId, "UTF-8")
     buffer.putShort(requiredAcks)
-    buffer.putInt(ackTimeout)
+    buffer.putInt(ackTimeoutSecs)
     //save the topic structure
     buffer.putInt(data.size) //the number of topics
     for(topicData <- data) {
       Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic
-      buffer.putInt(topicData.partitionData.size) //the number of partitions
-      for(partitionData <- topicData.partitionData) {
+      buffer.putInt(topicData.partitionDatas.size) //the number of partitions
+      for(partitionData <- topicData.partitionDatas) {
         buffer.putInt(partitionData.partition)
         buffer.putInt(partitionData.messages.getSerialized().limit)
         buffer.put(partitionData.messages.getSerialized())
@@ -85,10 +86,10 @@ case class ProducerRequest( versionId: Short,
   def sizeInBytes: Int = {
     var size = 0 
     //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
-    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4;
+    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
     for(topicData <- data) {
 	    size += 2 + topicData.topic.length + 4
-      for(partitionData <- topicData.partitionData) {
+      for(partitionData <- topicData.partitionDatas) {
         size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
       }
     }
@@ -102,12 +103,13 @@ case class ProducerRequest( versionId: Short,
         ( correlationId == that.correlationId &&
           clientId == that.clientId &&
           requiredAcks == that.requiredAcks &&
-          ackTimeout == that.ackTimeout &&
+          ackTimeoutSecs == that.ackTimeoutSecs &&
           data.toSeq == that.data.toSeq )
       case _ => false
     }
   }
 
-  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionData.length)
+  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDatas.length)
+
+}
 
-}
\ No newline at end of file
diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala
index 958e165..99ed525 100644
--- core/src/main/scala/kafka/common/ErrorMapping.scala
+++ core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -38,6 +38,7 @@ object ErrorMapping {
   val NoLeaderForPartitionCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
   val UnknownTopicCode : Short = 8
+  val RequestTimedOutCode: Short = 9
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -48,6 +49,7 @@ object ErrorMapping {
       classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
+      classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
     ).withDefaultValue(UnknownCode)
   
diff --git core/src/main/scala/kafka/common/RequestTimedOutException.scala core/src/main/scala/kafka/common/RequestTimedOutException.scala
new file mode 100644
index 0000000..faedea8
--- /dev/null
+++ core/src/main/scala/kafka/common/RequestTimedOutException.scala
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+
+/**
+ * Thrown when a produce request times out - i.e., if one or more partitions it
+ * sends messages to receives fewer than the requiredAcks that is specified in
+ * the produce request.
+ */
+class RequestTimedOutException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file
diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala
index e50b2cf..fb5a903 100644
--- core/src/main/scala/kafka/producer/SyncProducer.scala
+++ core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -101,7 +101,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     for( topicData <- producerRequest.data ) {
-      for( partitionData <- topicData.partitionData ) {
+      for( partitionData <- topicData.partitionDatas ) {
 	      verifyMessageSize(partitionData.messages)
         val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
         trace("Got message set with " + setSize + " bytes to send")
diff --git core/src/main/scala/kafka/producer/SyncProducerConfig.scala core/src/main/scala/kafka/producer/SyncProducerConfig.scala
index 6e12109..444d32b 100644
--- core/src/main/scala/kafka/producer/SyncProducerConfig.scala
+++ core/src/main/scala/kafka/producer/SyncProducerConfig.scala
@@ -51,11 +51,11 @@ trait SyncProducerConfigShared {
   /* the client application sending the producer requests */
   val clientId = Utils.getString(props,"producer.request.client_id","")
 
-  /* the required_acks of the producer requests */
-  val requiredAcks = Utils.getShort(props,"producer.request.required_acks",0)
+  /* the required acks of the producer requests */
+  val requiredAcks = Utils.getShort(props,"producer.request.required.acks",0)
 
-  /* the ack_timeout of the producer requests */
-  val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1)
+  /* the ack timeout of the producer requests */
+  val ackTimeout = Utils.getInt(props,"producer.request.ack.timeout.secs",1)
 }
 
 object SyncProducerConfig {
diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 69e1a65..1bc6ed8 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -179,7 +179,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
         var msgIdx = -1
         val errors = new ListBuffer[(String, Int)]
-        for( topic <- topicData; partition <- topic.partitionData ) {
+        for( topic <- topicData; partition <- topic.partitionDatas ) {
           msgIdx += 1
           if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
             errors.append((topic.topic, partition.partition))
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index 2b11672..82be85f 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -33,13 +33,15 @@ import scala.math._
 import java.lang.IllegalStateException
 import kafka.network.RequestChannel.Response
 
-
 /**
  * Logic to handle the various Kafka requests
  */
-class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
-                val replicaManager: ReplicaManager, val kafkaZookeeper: KafkaZooKeeper) extends Logging {
+class KafkaApis(val requestChannel: RequestChannel,
+                val logManager: LogManager,
+                val replicaManager: ReplicaManager,
+                val kafkaZookeeper: KafkaZooKeeper) extends Logging {
 
+  private val produceRequestPurgatory = new ProducerRequestPurgatory
   private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -86,6 +88,23 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1))
   }
 
+  /**
+   * Check if the partitionDatas from a produce request can unblock any
+   * DelayedFetch requests.
+   */
+  def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
+    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
+    for(partitionData <- partitionDatas)
+      satisfied ++= fetchRequestPurgatory.update(
+        (topic, partitionData.partition), partitionData)
+    trace("Produce request to %s unblocked %d DelayedFetchRequests.".format(topic, satisfied.size))
+    // send any newly unblocked responses
+    for(fetchReq <- satisfied) {
+      val topicData = readMessageSets(fetchReq.fetch)
+      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
+    }
+  }
 
   /**
    * Handle a produce request
@@ -97,20 +116,25 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
       requestLogger.trace("Producer request " + request.toString)
 
     val response = produce(produceRequest)
-    debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
-    
-    // Now check any outstanding fetches this produce just unblocked
-    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
-    for(topicData <- produceRequest.data) {
-      for(partition <- topicData.partitionData)
-        satisfied ++= fetchRequestPurgatory.update((topicData.topic, partition.partition), topicData)
+    if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1) {
+      debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
+
+      for (topicData <- produceRequest.data)
+        maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDatas)
     }
-    // send any newly unblocked responses
-    for(fetchReq <- satisfied) {
-       val topicData = readMessageSets(fetchReq.fetch)
-       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
-      requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
+    else {
+      val keys = produceRequest.data.flatMap(topicData => {
+        val topic = topicData.topic
+        topicData.partitionDatas.map(partitionData => {
+          (topic, partitionData.partition)
+        })
+      })
+      val delayedProduce = new DelayedProduce(
+        keys, request,
+        response.errors, response.offsets,
+        produceRequest, produceRequest.ackTimeoutSecs.toLong * 1000L, this)
+      produceRequestPurgatory.watch(delayedProduce)
     }
   }
 
@@ -124,11 +148,13 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
 
     var msgIndex = -1
     for(topicData <- request.data) {
-      for(partitionData <- topicData.partitionData) {
+      for(partitionData <- topicData.partitionDatas) {
         msgIndex += 1
         BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
         BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
         try {
+          // TODO: should use replicaManager for ensurePartitionLeaderOnThisBroker?
+          // although this ties in with KAFKA-352
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
@@ -173,15 +199,28 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
         requestChannel.sendResponse(channelResponse)
     }
 
-    if(fetchRequest.replicaId != -1)
+    if(fetchRequest.replicaId != -1) {
       maybeUpdatePartitionHW(fetchRequest)
+      // after updating HW, some delayed produce requests may be unblocked
+      var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
+      fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
+        topicOffsetInfo.partitions.foreach(partition => {
+          satisfiedProduceRequests ++= produceRequestPurgatory.update(
+            (topicOffsetInfo.topic, partition), (topicOffsetInfo.topic, partition)
+          )
+        })
+      })
+      trace("Replica %d fetch unblocked %d DelayedProduce requests.".format(
+        fetchRequest.replicaId, satisfiedProduceRequests.size))
+      satisfiedProduceRequests.foreach(_.respond())
+    }
 
     // if there are enough bytes available right now we can answer the request, otherwise we have to punt
     val availableBytes = availableFetchBytes(fetchRequest)
     if(fetchRequest.maxWait <= 0 || availableBytes >= fetchRequest.minBytes) {
       val topicData = readMessageSets(fetchRequest)
       debug("Returning fetch response %s for fetch request with correlation id %d"
-        .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
+        .format(topicData.map(_.partitionDatas.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1))
     } else {
@@ -355,18 +394,18 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
    */
   class DelayedFetch(keys: Seq[Any], request: RequestChannel.Request, val fetch: FetchRequest, delayMs: Long, initialSize: Long) extends DelayedRequest(keys, request, delayMs) {
     val bytesAccumulated = new AtomicLong(initialSize)
-   }
+  }
 
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, TopicData] {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
     
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
-    def checkSatisfied(topicData: TopicData, delayedFetch: DelayedFetch): Boolean = {
-      val messageDataSize = topicData.partitionData.map(_.messages.sizeInBytes).sum
+    def checkSatisfied(partitionData: PartitionData, delayedFetch: DelayedFetch): Boolean = {
+      val messageDataSize = partitionData.messages.sizeInBytes
       val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
       accumulatedSize >= delayedFetch.fetch.minBytes
     }
@@ -380,4 +419,6 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1))
     }
   }
+
 }
+
diff --git core/src/main/scala/kafka/server/KafkaServer.scala core/src/main/scala/kafka/server/KafkaServer.scala
index b2c9e47..f965400 100644
--- core/src/main/scala/kafka/server/KafkaServer.scala
+++ core/src/main/scala/kafka/server/KafkaServer.scala
@@ -18,13 +18,14 @@
 package kafka.server
 
 import java.io.File
-import kafka.network.{SocketServerStats, SocketServer}
 import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.ZkClient
+import kafka.network.{SocketServerStats, SocketServer}
+
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -73,10 +74,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
     kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
+    val produceRequestPurgatory = new ProducerRequestPurgatory
+
+    replicaManager = new ReplicaManager(
+      config, time, produceRequestPurgatory, kafkaZookeeper.getZookeeperClient)
+
+    apis = new KafkaApis(
+      socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
+
+    requestHandlerPool = new KafkaRequestHandlerPool(
+      socketServer.requestChannel, apis, config.numIoThreads)
 
-    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
-    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
     socketServer.startup
 
     Mx4jLoader.maybeLoad
@@ -152,6 +160,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats
-}
 
+}
 
diff --git core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
new file mode 100644
index 0000000..112d3fe
--- /dev/null
+++ core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+
+import kafka.api.DelayedProduce
+
+
+/**
+ * A holding pen for produce requests waiting to be satisfied.
+ */
+private [kafka] class ProducerRequestPurgatory
+        extends RequestPurgatory[DelayedProduce, Tuple2[String, Int]] {
+
+  protected def checkSatisfied(fetchRequestPartition: Tuple2[String, Int],
+                               delayedProduce: DelayedProduce) =
+    delayedProduce.isSatisfied(fetchRequestPartition, delayedProduce)
+
+  /**
+   * Handle an expired delayed request
+   */
+  protected def expire(delayedProduce: DelayedProduce) {
+    delayedProduce.respond()
+  }
+}
+
diff --git core/src/main/scala/kafka/server/ReplicaFetcherThread.scala core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 7cdf5e6..aa5ba2b 100644
--- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -54,7 +54,7 @@ class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker,
         // append messages to local log
         replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
         // record the hw sent by the leader for this partition
-        val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
+        val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionDatas.head.hw)
         replica.highWatermark(Some(followerHighWatermark))
         trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
           .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
diff --git core/src/main/scala/kafka/server/ReplicaManager.scala core/src/main/scala/kafka/server/ReplicaManager.scala
index 0012f1d..436e210 100644
--- core/src/main/scala/kafka/server/ReplicaManager.scala
+++ core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -26,12 +26,18 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
 import kafka.common.InvalidPartitionException
 
-class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
+
+class ReplicaManager(val config: KafkaConfig,
+                     time: Time,
+                     producerRequestPurgatory: ProducerRequestPurgatory,
+                     zkClient: ZkClient)
+        extends Logging {
 
   private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
   private var leaderReplicas = new ListBuffer[Partition]()
   private val leaderReplicaLock = new ReentrantLock()
   private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+
   // start ISR expiration thread
   isrExpirationScheduler.startUp
   isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
@@ -47,7 +53,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
         replica.log match {
           case None =>
             replica.log = Some(log)
-          case Some(log) => // nothing to do since log already exists
+          case Some(_) => // nothing to do since log already exists
         }
       case None =>
         partition.addReplica(localReplica)
@@ -162,6 +168,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
     }finally {
       leaderReplicaLock.unlock()
     }
+
     replica.log match {
       case Some(log) =>  // log is already started
         log.recoverUptoLastCheckpointedHW()
@@ -170,6 +177,19 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
     // get leader for this replica
     val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
     val isReplicaAFollower = replica.getIfFollowerAndLeader()
+
+    if (!isReplicaAFollower._1) {
+      /*
+       * If this replica was a leader and now becomes a follower,
+       * respond to all pending produce requests for this partition with error.
+       * This is not strictly required, but helps unblock
+       * DelayedProducerRequests sooner.
+       */
+      debug("Updating DelayedProduceRequest's to %s-%d".format(
+        replica.topic, replica.partition.partitionId))
+      val key = (replica.topic, replica.partition.partitionId)
+      producerRequestPurgatory.update(key, key)
+    }
     // Become follower only if it is not already following the same leader
     if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
       // stop fetcher thread to previous leader
@@ -179,7 +199,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
     }
   }
 
-  def maybeShrinkISR(): Unit = {
+  def maybeShrinkISR() {
     try {
       info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
         .format(config.keepInSyncTimeMs))
@@ -215,7 +235,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
       " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
   }
 
-  def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
+  def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) {
     val replicaOpt = getReplica(topic, partition, replicaId)
     replicaOpt match {
       case Some(replica) =>
@@ -247,3 +267,4 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex
     allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
   }
 }
+
diff --git core/src/main/scala/kafka/server/RequestPurgatory.scala core/src/main/scala/kafka/server/RequestPurgatory.scala
index f82d2e7..be98f3e 100644
--- core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -30,6 +30,7 @@ import kafka.utils._
  * for example a key could be a (topic, partition) pair.
  */
 class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
+  val sTime = SystemTime.milliseconds
   val satisfied = new AtomicBoolean(false)
 }
 
diff --git core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index b26cf25..875b566 100644
--- core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -96,7 +96,8 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
+    val apis = new KafkaApis(
+      requestChannel, logManager, replicaManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
diff --git core/src/test/scala/unit/kafka/producer/ProducerTest.scala core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 68c2c1f..50e8f39 100644
--- core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -30,6 +30,8 @@ import org.apache.log4j.{Level, Logger}
 import org.junit.Assert._
 import org.junit.Test
 import kafka.utils._
+import java.util
+
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
@@ -85,22 +87,29 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
 
   @Test
   def testZKSendToNewTopic() {
-    val props = new Properties()
-    props.put("serializer.class", "kafka.serializer.StringEncoder")
-    props.put("partitioner.class", "kafka.utils.StaticPartitioner")
-    props.put("zk.connect", TestZKUtils.zookeeperConnect)
+    val props1 = new util.Properties()
+    props1.put("serializer.class", "kafka.serializer.StringEncoder")
+    props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
+    props1.put("zk.connect", TestZKUtils.zookeeperConnect)
+    props1.put("producer.request.required.acks", "2")
 
-    val config = new ProducerConfig(props)
+    val props2 = new util.Properties()
+    props2.putAll(props1)
+    props2.put("producer.request.required.acks", "3")
+
+    val config1 = new ProducerConfig(props1)
+    val config2 = new ProducerConfig(props2)
 
     // create topic with 1 partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
+    CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
     TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
 
-    val producer = new Producer[String, String](config)
+    val producer1 = new Producer[String, String](config1)
+    val producer2 = new Producer[String, String](config2)
     try {
       // Available partition ids should be 0.
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
       // get the leader
       val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
       assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
@@ -118,13 +127,27 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
       assertEquals(new Message("test1".getBytes), messageSet.next.message)
       assertTrue("Message set should have 1 message", messageSet.hasNext)
       assertEquals(new Message("test1".getBytes), messageSet.next.message)
+      assertFalse("Message set should not have any more messages", messageSet.hasNext)
     } catch {
       case e: Exception => fail("Not expected", e)
     } finally {
-      producer.close
+      producer1.close()
+    }
+
+    try {
+      producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))
+      fail("Should have timed out for 3 acks.")
+    }
+    catch {
+      case se: FailedToSendMessageException => true
+      case e => fail("Not expected", e)
+    }
+    finally {
+      producer2.close()
     }
   }
 
+
   @Test
   def testZKSendWithDeadBroker() {
     val props = new Properties()
diff --git core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 9ac32b5..2c3ffa2 100644
--- core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -99,7 +99,9 @@ class ISRExpirationTest extends JUnit3Suite {
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     EasyMock.replay(zkClient)
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+    val noProduceRequestPurgatory = null
+    val replicaManager = new ReplicaManager(
+      configs.head, time, noProduceRequestPurgatory, zkClient)
     try {
       val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
       // create leader log
