diff --git core/src/main/scala/kafka/api/FetchRequest.scala core/src/main/scala/kafka/api/FetchRequest.scala
index 7e1fa47..a472cb3 100644
--- core/src/main/scala/kafka/api/FetchRequest.scala
+++ core/src/main/scala/kafka/api/FetchRequest.scala
@@ -18,9 +18,9 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.utils.Utils
-import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
-import kafka.common.{KafkaException, FetchRequestFormatException}
+import kafka.utils.{nonthreadsafe, Utils}
+import scala.collection.immutable.Map
+import kafka.common.KafkaException
 
 object OffsetDetail {
 
@@ -73,6 +73,8 @@ case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long],
   }
 }
 
+case class PartitionFetchInfo(offset: Long, fetchSize: Int)
+
 object FetchRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultCorrelationId = -1
@@ -85,18 +87,23 @@ object FetchRequest {
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = Utils.readShortString(buffer, "UTF-8")
+    val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val replicaId = buffer.getInt
     val maxWait = buffer.getInt
     val minBytes = buffer.getInt
-    val offsetsCount = buffer.getInt
-    val offsetInfo = new Array[OffsetDetail](offsetsCount)
-    for(i <- 0 until offsetInfo.length)
-      offsetInfo(i) = OffsetDetail.readFrom(buffer)
-
-    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val offset = buffer.getLong
+        val fetchSize = buffer.getInt
+        ((topic, partitionId), (PartitionFetchInfo(offset, fetchSize)))
+      })
+    })
+    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
   }
-
 }
 
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@@ -105,48 +112,61 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         replicaId: Int = FetchRequest.DefaultReplicaId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
-                        offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
-
-  // ensure that a topic "X" appears in at most one OffsetDetail
-  def validate() {
-    if(offsetInfo == null)
-      throw new FetchRequestFormatException("FetchRequest has null offsetInfo")
-
-    // We don't want to get fancy with groupBy's and filter's since we just want the first occurrence
-    var topics = Set[String]()
-    val iter = offsetInfo.iterator
-    while(iter.hasNext) {
-      val offsetData = iter.next()
-      val topic = offsetData.topic
-      if(topics.contains(topic))
-        throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
-      else
-        topics += topic
-    }
-  }
+                        requestInfo: Map[(String, Int), PartitionFetchInfo])
+        extends RequestOrResponse(Some(RequestKeys.Fetch)) {
 
-  def writeTo(buffer: ByteBuffer) {
-    // validate first
-    validate()
+  val groupedRequestInfo = requestInfo.groupBy(_._1._1)
 
+  def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, "UTF-8")
+    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
     buffer.putInt(replicaId)
     buffer.putInt(maxWait)
     buffer.putInt(minBytes)
-    buffer.putInt(offsetInfo.size)
-    for(topicDetail <- offsetInfo) {
-      topicDetail.writeTo(buffer)
+    buffer.putInt(groupedRequestInfo.size) // topic count
+    groupedRequestInfo.foreach {
+      case (topic, partitionFetchInfos) => {
+        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+        buffer.putInt(partitionFetchInfos.size) // partition count
+        partitionFetchInfos.foreach{
+          case ((_, partitionId), fetchInfo) => {
+            buffer.putInt(partitionId)
+            buffer.putLong(fetchInfo.offset)
+            buffer.putInt(fetchInfo.fetchSize)
+          }
+        }
+      }
+
     }
   }
 
-  def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
+  def sizeInBytes: Int = {
+    2 + /* versionId */
+    4 + /* correlationId */
+    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+    4 + /* replicaId */
+    4 + /* maxWait */
+    4 + /* minBytes */
+    4 + /* topic count */
+    groupedRequestInfo.foldLeft(0)((foldedTopics, currTopic) => {
+      val (topic, partitionFetchInfos) = currTopic
+      foldedTopics +
+      Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+      4 + /* partition count */
+      partitionFetchInfos.size * (
+        4 + /* partition id */
+        8 + /* offset */
+        4 /* fetch size */
+      )
+    })
+  }
 
-  def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
+  def numPartitions = requestInfo.size
 }
 
 
+@nonthreadsafe
 class FetchRequestBuilder() {
   private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
@@ -154,13 +174,10 @@ class FetchRequestBuilder() {
   private var replicaId = FetchRequest.DefaultReplicaId
   private var maxWait = FetchRequest.DefaultMaxWait
   private var minBytes = FetchRequest.DefaultMinBytes
-  private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]]
+  private val requestMap = new collection.mutable.HashMap[(String, Int), PartitionFetchInfo]
 
   def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
-    val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]()))
-    topicData._1.append(partition)
-    topicData._2.append(offset)
-    topicData._3.append(fetchSize)
+    requestMap.put((topic, partition), PartitionFetchInfo(offset, fetchSize))
     this
   }
 
@@ -190,9 +207,6 @@ class FetchRequestBuilder() {
   }
 
   def build() = {
-    val offsetDetails = requestMap.map{ topicData =>
-      new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
-    }
-    new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
+    FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
   }
 }
diff --git core/src/main/scala/kafka/api/FetchResponse.scala core/src/main/scala/kafka/api/FetchResponse.scala
index 1d581d4..001c52b 100644
--- core/src/main/scala/kafka/api/FetchResponse.scala
+++ core/src/main/scala/kafka/api/FetchResponse.scala
@@ -26,20 +26,28 @@ import kafka.utils.Utils
 
 object PartitionData {
   def readFrom(buffer: ByteBuffer): PartitionData = {
-    val error = buffer.getShort
     val partition = buffer.getInt
+    val error = buffer.getShort
     val initialOffset = buffer.getLong
-    val hw = buffer.getLong()
+    val hw = buffer.getLong
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
     new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
   }
+
+  val headerSize =
+    4 + /* partition */
+    2 + /* error code */
+    8 + /* initialOffset */
+    8 + /* high watermark */
+    4 /* messageSetSize */
 }
 
 case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
-  val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
+
+  val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue()
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
 }
@@ -50,17 +58,17 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send {
   private val messageSize = partitionData.messages.sizeInBytes
   private var messagesSentSize = 0L
 
-  private val buffer = ByteBuffer.allocate(26)
-  buffer.putShort(partitionData.error)
+  private val buffer = ByteBuffer.allocate(PartitionData.headerSize)
   buffer.putInt(partitionData.partition)
+  buffer.putShort(partitionData.error)
   buffer.putLong(partitionData.initialOffset)
   buffer.putLong(partitionData.hw)
   buffer.putInt(partitionData.messages.sizeInBytes.intValue())
   buffer.rewind()
 
-  def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
+  override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
 
-  def writeTo(channel: GatheringByteChannel): Int = {
+  override def writeTo(channel: GatheringByteChannel): Int = {
     var written = 0
     if(buffer.hasRemaining)
       written += channel.write(buffer)
@@ -75,63 +83,43 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send {
 
 object TopicData {
   def readFrom(buffer: ByteBuffer): TopicData = {
-    val topic = Utils.readShortString(buffer, "UTF-8")
+    val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val partitionCount = buffer.getInt
-    val partitions = new Array[PartitionData](partitionCount)
-    for(i <- 0 until partitionCount)
-      partitions(i) = PartitionData.readFrom(buffer)
-    new TopicData(topic, partitions.sortBy(_.partition))
+    val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
+      val partitionData = PartitionData.readFrom(buffer)
+      ((topic, partitionData.partition), partitionData)
+    })
+    TopicData(topic, Map(topicPartitionDataPairs:_*))
   }
 
-  def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = {
-    if(data == null || data.size == 0)
-      return None
-
-    var (low, high) = (0, data.size-1)
-    while(low <= high) {
-      val mid = (low + high) / 2
-      val found = data(mid)
-      if(found.partition == partition)
-        return Some(found)
-      else if(partition < found.partition)
-        high = mid - 1
-      else
-        low = mid + 1
-    }
-    None
-  }
+  def headerSize(topic: String) =
+    Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+    4 /* partition count */
 }
 
-case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) {
-  val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes)
+case class TopicData(topic: String, partitionData: Map[(String, Int), PartitionData]) {
+  val sizeInBytes =
+    TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
 
-  // need to override equals due to brokern java-arrays equals functionality
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: TopicData =>
-        ( topic == that.topic &&
-          partitionDataArray.toSeq == that.partitionDataArray.toSeq )
-      case _ => false
-    }
-  }
+  val headerSize = TopicData.headerSize(topic)
 }
 
 class TopicDataSend(val topicData: TopicData) extends Send {
-  val size = topicData.sizeInBytes
+  private val size = topicData.sizeInBytes
 
-  var sent = 0
+  private var sent = 0
 
-  private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4)
-  Utils.writeShortString(buffer, topicData.topic, "UTF-8")
-  buffer.putInt(topicData.partitionDataArray.length)
+  override def complete = sent >= size
+
+  private val buffer = ByteBuffer.allocate(topicData.headerSize)
+  Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset)
+  buffer.putInt(topicData.partitionData.size)
   buffer.rewind()
 
-  val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) {
-    val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes)
+  val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) {
+    val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
   }
 
-  def complete = sent >= size
-
   def writeTo(channel: GatheringByteChannel): Int = {
     expectIncomplete()
     var written = 0
@@ -146,68 +134,71 @@ class TopicDataSend(val topicData: TopicData) extends Send {
 }
 
 
+object FetchResponse {
 
+  val headerSize =
+    2 + /* versionId */
+    4 + /* correlationId */
+    4 /* topic count */
 
-object FetchResponse {
   def readFrom(buffer: ByteBuffer): FetchResponse = {
     val versionId = buffer.getShort
-    val errorCode = buffer.getShort
     val correlationId = buffer.getInt
-    val dataCount = buffer.getInt
-    val data = new Array[TopicData](dataCount)
-    for(i <- 0 until data.length)
-      data(i) = TopicData.readFrom(buffer)
-    new FetchResponse(versionId, correlationId, data, errorCode)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topicData = TopicData.readFrom(buffer)
+      topicData.partitionData.values.map(partitionData => ((topicData.topic, partitionData.partition), partitionData))
+    })
+    FetchResponse(versionId, correlationId, Map(pairs:_*))
   }
 }
 
 
 case class FetchResponse(versionId: Short,
                          correlationId: Int,
-                         data: Array[TopicData],
-                         errorCode: Short = ErrorMapping.NoError)  {
+                         data: Map[(String, Int), PartitionData])  {
 
-  val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
+  lazy val dataGroupedByTopic = data.groupBy(_._1._1)
 
-  lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
+  lazy val hasError = data.values.exists(_.error != ErrorMapping.NoError)
 
-  def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
-    val messageSet = topicMap.get(topic) match {
-      case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty)
-      case None =>
-        MessageSet.Empty
-    }
-    messageSet.asInstanceOf[ByteBufferMessageSet]
-  }
+  val sizeInBytes =
+    FetchResponse.headerSize +
+    dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
+      val topicData = TopicData(curr._1, curr._2)
+      folded +
+      topicData.sizeInBytes
+    })
 
-  def highWatermark(topic: String, partition: Int): Long = {
-    topicMap.get(topic) match {
-      case Some(topicData) =>
-        TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L)
-      case None => -1L
-    }
-  }
+  def messageSet(topic: String, partition: Int): ByteBufferMessageSet =
+    data.get((topic, partition)).map(_.messages).getOrElse(MessageSet.Empty).asInstanceOf[ByteBufferMessageSet]
+
+  def highWatermark(topic: String, partition: Int): Long =
+    data.get((topic, partition)).map(_.hw).getOrElse(-1L)
 }
 
 
 class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
   private val size = fetchResponse.sizeInBytes
+
   private var sent = 0
-  
-  private val buffer = ByteBuffer.allocate(16)
+
+  private val sendSize = 4 /* for size */ + size
+
+  override def complete = sent >= sendSize
+
+  private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
   buffer.putInt(size)
   buffer.putShort(fetchResponse.versionId)
-  buffer.putShort(fetchResponse.errorCode)
   buffer.putInt(fetchResponse.correlationId)
-  buffer.putInt(fetchResponse.data.length)
+  buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
   buffer.rewind()
-  
-  val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) {
-    val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes)
-  }
 
-  def complete = sent >= sendSize
+  val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
+    case(topic, data) => new TopicDataSend(TopicData(topic, data))
+  }) {
+    val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
+  }
 
   def writeTo(channel: GatheringByteChannel):Int = {
     expectIncomplete()
@@ -220,6 +211,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
     sent += written
     written
   }
-
-  def sendSize = 4 + fetchResponse.sizeInBytes
 }
+
diff --git core/src/main/scala/kafka/api/ProducerRequest.scala core/src/main/scala/kafka/api/ProducerRequest.scala
index d7767b5..cac7f6c 100644
--- core/src/main/scala/kafka/api/ProducerRequest.scala
+++ core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -20,6 +20,7 @@ package kafka.api
 import java.nio._
 import kafka.message._
 import kafka.utils._
+import scala.collection.Map
 
 
 object ProducerRequest {
@@ -28,28 +29,25 @@ object ProducerRequest {
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
     val correlationId: Int = buffer.getInt
-    val clientId: String = Utils.readShortString(buffer, "UTF-8")
+    val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
     val requiredAcks: Short = buffer.getShort
     val ackTimeoutMs: Int = buffer.getInt
     //build the topic structure
     val topicCount = buffer.getInt
-    val data = new Array[TopicData](topicCount)
-    for(i <- 0 until topicCount) {
-      val topic = Utils.readShortString(buffer, "UTF-8")
-      		
+    val partitionDataPairs = (1 to topicCount).flatMap(_ => {
+      // process topic
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
       val partitionCount = buffer.getInt
-      //build the partition structure within this topic
-      val partitionData = new Array[PartitionData](partitionCount)
-      for (j <- 0 until partitionCount) {
+      (1 to partitionCount).map(_ => {
         val partition = buffer.getInt
         val messageSetSize = buffer.getInt
         val messageSetBuffer = new Array[Byte](messageSetSize)
         buffer.get(messageSetBuffer,0,messageSetSize)
-        partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
-      }
-      data(i) = new TopicData(topic,partitionData)
-    }
-    new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
+        ((topic, partition), new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
+      })
+    })
+
+    ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
   }
 }
 
@@ -58,58 +56,65 @@ case class ProducerRequest( versionId: Short,
                             clientId: String,
                             requiredAcks: Short,
                             ackTimeoutMs: Int,
-                            data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+                            data: Map[(String, Int), PartitionData])
+    extends RequestOrResponse(Some(RequestKeys.Produce)) {
 
-  def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
+  /**
+   * Partitions the data into a map of maps (one for each topic).
+   */
+  private lazy val dataGroupedByTopic = data.groupBy(_._1._1)
+
+  def this(correlationId: Int,
+           clientId: String,
+           requiredAcks: Short,
+           ackTimeoutMs: Int,
+           data: Map[(String, Int), PartitionData]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     buffer.putInt(correlationId)
-    Utils.writeShortString(buffer, clientId, "UTF-8")
+    Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
     buffer.putShort(requiredAcks)
     buffer.putInt(ackTimeoutMs)
+
     //save the topic structure
-    buffer.putInt(data.size) //the number of topics
-    for(topicData <- data) {
-      Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic
-      buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
-      for(partitionData <- topicData.partitionDataArray) {
-        buffer.putInt(partitionData.partition)
-        buffer.putInt(partitionData.messages.getSerialized().limit)
-        buffer.put(partitionData.messages.getSerialized())
-        partitionData.messages.getSerialized().rewind
-      }
+    buffer.putInt(dataGroupedByTopic.size) //the number of topics
+    dataGroupedByTopic.foreach {
+      case (topic, topicAndPartitionData) =>
+        Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
+        buffer.putInt(topicAndPartitionData.size) //the number of partitions
+        topicAndPartitionData.foreach(partitionAndData => {
+          val partitionData = partitionAndData._2
+          buffer.putInt(partitionData.partition)
+          buffer.putInt(partitionData.messages.getSerialized().limit)
+          buffer.put(partitionData.messages.getSerialized())
+          partitionData.messages.getSerialized().rewind
+        })
     }
   }
 
   def sizeInBytes(): Int = {
-    var size = 0 
-    //size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
-    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
-    for(topicData <- data) {
-	    size += 2 + topicData.topic.length + 4
-      for(partitionData <- topicData.partitionDataArray) {
-        size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
+    2 + /* versionId */
+    4 + /* correlationId */
+    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
+    2 + /* requiredAcks */
+    4 + /* ackTimeoutMs */
+    4 + /* number of topics */
+    dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+      foldedTopics +
+      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      4 + /* the number of partitions */
+      {
+        currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
+          foldedPartitions +
+          4 + /* partition id */
+          4 + /* byte-length of serialized messages */
+          currPartition._2.messages.sizeInBytes.toInt
+        })
       }
-    }
-    size
+    })
   }
 
-  // need to override case-class equals due to broken java-array equals()
-  override def equals(other: Any): Boolean = {
-   other match {
-      case that: ProducerRequest =>
-        ( correlationId == that.correlationId &&
-          clientId == that.clientId &&
-          requiredAcks == that.requiredAcks &&
-          ackTimeoutMs == that.ackTimeoutMs &&
-          data.toSeq == that.data.toSeq )
-      case _ => false
-    }
-  }
-
-  def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length)
-
 }
 
diff --git core/src/main/scala/kafka/api/ProducerResponse.scala core/src/main/scala/kafka/api/ProducerResponse.scala
index dc110e0..847042d 100644
--- core/src/main/scala/kafka/api/ProducerResponse.scala
+++ core/src/main/scala/kafka/api/ProducerResponse.scala
@@ -18,6 +18,8 @@
 package kafka.api
 
 import java.nio.ByteBuffer
+import kafka.utils.Utils
+import scala.collection.Map
 import kafka.common.ErrorMapping
 
 
@@ -25,50 +27,67 @@ object ProducerResponse {
   def readFrom(buffer: ByteBuffer): ProducerResponse = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val errorCode = buffer.getShort
-    val errorsSize = buffer.getInt
-    val errors = new Array[Short](errorsSize)
-    for( i <- 0 until errorsSize) {
-      errors(i) = buffer.getShort
-    }
-    val offsetsSize = buffer.getInt
-    val offsets = new Array[Long](offsetsSize)
-    for( i <- 0 until offsetsSize) {
-      offsets(i) = buffer.getLong
-    }
-    new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
+    val topicCount = buffer.getInt
+    val statusPairs = (1 to topicCount).flatMap(_ => {
+      val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partition = buffer.getInt
+        val error = buffer.getShort
+        val offset = buffer.getLong
+        ((topic, partition), (error, offset))
+      })
+    })
+
+    ProducerResponse(versionId, correlationId, Map(statusPairs:_*))
   }
 }
 
-case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
-                            offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
-  val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
+
+case class ProducerResponse(versionId: Short,
+                            correlationId: Int,
+                            status: Map[(String, Int), (Short,  Long)]) extends RequestOrResponse {
+
+  lazy val hasError = status.values.exists(_._1 != ErrorMapping.NoError)
+
+  private lazy val statusGroupedByTopic = status.groupBy(_._1._1)
+
+  val sizeInBytes = {
+    val groupedStatus = statusGroupedByTopic
+    2 + /* version id */
+    4 + /* correlation id */
+    4 + /* topic count */
+    groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
+      foldedTopics +
+      Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
+      4 + /* partition count for this topic */
+      currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
+        foldedPartitions +
+        4 + /* partition id */
+        2 + /* error code */
+        8 /* offset */
+      })
+    })
+  }
 
   def writeTo(buffer: ByteBuffer) {
-    /* version id */
+    val groupedStatus = statusGroupedByTopic
+
     buffer.putShort(versionId)
-    /* correlation id */
     buffer.putInt(correlationId)
-    /* error code */
-    buffer.putShort(errorCode)
-    /* errors */
-    buffer.putInt(errors.length)
-    errors.foreach(buffer.putShort(_))
-    /* offsets */
-    buffer.putInt(offsets.length)
-    offsets.foreach(buffer.putLong(_))
-  }
+    buffer.putInt(groupedStatus.size) // topic count
 
-  // need to override case-class equals due to broken java-array equals()
-  override def equals(other: Any): Boolean = {
-   other match {
-      case that: ProducerResponse =>
-        ( correlationId == that.correlationId &&
-          versionId == that.versionId &&
-          errorCode == that.errorCode &&
-          errors.toSeq == that.errors.toSeq &&
-          offsets.toSeq == that.offsets.toSeq)
-      case _ => false
-    }
+    groupedStatus.foreach(topicStatus => {
+      val (topic, errorsAndOffsets) = topicStatus
+      Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
+      buffer.putInt(errorsAndOffsets.size) // partition count
+      errorsAndOffsets.foreach(partitionStatus => {
+        val (partitionId, error, offset) = (partitionStatus._1._2, partitionStatus._2._1, partitionStatus._2._2)
+        buffer.putInt(partitionId)
+        buffer.putShort(error)
+        buffer.putLong(offset)
+      })
+    })
   }
-}
\ No newline at end of file
+}
+
diff --git core/src/main/scala/kafka/api/RequestOrResponse.scala core/src/main/scala/kafka/api/RequestOrResponse.scala
index ac5b64e..611bb42 100644
--- core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -19,6 +19,12 @@ package kafka.api
 
 import java.nio._
 
+
+object RequestOrResponse {
+  val DefaultCharset = "UTF-8"
+}
+
+
 private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
 
   def sizeInBytes: Int
diff --git core/src/main/scala/kafka/common/ErrorMapping.scala core/src/main/scala/kafka/common/ErrorMapping.scala
index 54312c0..48a14bf 100644
--- core/src/main/scala/kafka/common/ErrorMapping.scala
+++ core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -47,7 +47,6 @@ object ErrorMapping {
       classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
       classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
       classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
-      classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
diff --git core/src/main/scala/kafka/common/FetchRequestFormatException.scala core/src/main/scala/kafka/common/FetchRequestFormatException.scala
deleted file mode 100644
index 0bc7d4e..0000000
--- core/src/main/scala/kafka/common/FetchRequestFormatException.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.common
-
-class FetchRequestFormatException(val message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
diff --git core/src/main/scala/kafka/javaapi/FetchResponse.scala core/src/main/scala/kafka/javaapi/FetchResponse.scala
index 3d0928a..55d343a 100644
--- core/src/main/scala/kafka/javaapi/FetchResponse.scala
+++ core/src/main/scala/kafka/javaapi/FetchResponse.scala
@@ -17,14 +17,14 @@
 
 package kafka.javaapi
 
-import kafka.api.TopicData
+import kafka.api.PartitionData
 
 
 class FetchResponse( val versionId: Short,
                      val correlationId: Int,
-                     private val data: Array[TopicData] ) {
+                     private val data: Map[(String, Int), PartitionData] ) {
 
-  private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data)
+  private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
 
   def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
     import Implicits._
diff --git core/src/main/scala/kafka/javaapi/ProducerRequest.scala core/src/main/scala/kafka/javaapi/ProducerRequest.scala
index 77e07d7..4658291 100644
--- core/src/main/scala/kafka/javaapi/ProducerRequest.scala
+++ core/src/main/scala/kafka/javaapi/ProducerRequest.scala
@@ -16,15 +16,16 @@
  */
 package kafka.javaapi
 
-import kafka.api.RequestOrResponse
-import kafka.api.{RequestKeys, TopicData}
 import java.nio.ByteBuffer
+import kafka.api.{PartitionData, RequestOrResponse, RequestKeys}
+import scala.collection.Map
 
 class ProducerRequest(val correlationId: Int,
                       val clientId: String,
                       val requiredAcks: Short,
                       val ackTimeoutMs: Int,
-                      val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+                      val data: Map[(String, Int), PartitionData])
+    extends RequestOrResponse(Some(RequestKeys.Produce)) {
 	
   val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 
diff --git core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
index c1ff168..99ff40c 100644
--- core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
+++ core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
@@ -18,7 +18,7 @@ package kafka.javaapi.producer
 
 import kafka.producer.SyncProducerConfig
 import kafka.javaapi.message.ByteBufferMessageSet
-import kafka.api.{ProducerResponse, PartitionData, TopicData}
+import kafka.api.{ProducerResponse, PartitionData}
 
 class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
 
@@ -31,8 +31,7 @@ class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
   }
 
   def send(topic: String, messages: ByteBufferMessageSet): ProducerResponse = {
-    val partitionData = Array[PartitionData]( new PartitionData(-1, messages.underlying) )
-    val data = Array[TopicData]( new TopicData(topic, partitionData) )
+    val data = Map((topic, -1) -> new PartitionData(-1, messages.underlying))
     val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data)
     underlying.send(producerRequest)
   }
diff --git core/src/main/scala/kafka/producer/SyncProducer.scala core/src/main/scala/kafka/producer/SyncProducer.scala
index 064b7f4..7083fdd 100644
--- core/src/main/scala/kafka/producer/SyncProducer.scala
+++ core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -101,13 +101,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    for( topicData <- producerRequest.data ) {
-      for( partitionData <- topicData.partitionDataArray ) {
-	      verifyMessageSize(partitionData.messages)
-        val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
-        trace("Got message set with " + setSize + " bytes to send")
-      }
-    }
+    producerRequest.data.foreach( topicAndPartitionData => {
+      val partitionData = topicAndPartitionData._2
+      verifyMessageSize(partitionData.messages)
+      val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
+      trace("Got message set with " + setSize + " bytes to send")
+    })
     val response = doSend(producerRequest)
     ProducerResponse.readFrom(response.buffer)
   }
diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index d2e9529..7d759ef 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -22,9 +22,9 @@ import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet}
 import kafka.producer._
 import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
-import scala.collection.Map
+import scala.collection.{Seq, Map}
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData}
+import kafka.api.{TopicMetadata, ProducerRequest, PartitionData}
 
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -172,30 +172,25 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
    */
   private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = {
     if(brokerId < 0) {
-      warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
+      warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
       messagesPerTopic.keys.toSeq
     } else if(messagesPerTopic.size > 0) {
-      val topics = new HashMap[String, ListBuffer[PartitionData]]()
-      for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) {
-        val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]())
-        partitionData.append(new PartitionData(partitionId, messagesSet))
-      }
-      val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
+      val topicPartitionDataPairs = messagesPerTopic.toSeq.map(topicAndMessages => {
+        val (topic, partitionId) = topicAndMessages._1
+        ((topic, partitionId), new PartitionData(partitionId, topicAndMessages._2))
+      })
       val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
-        config.requestTimeoutMs, topicData)
+        config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
       try {
         val syncProducer = producerPool.getProducer(brokerId)
         val response = syncProducer.send(producerRequest)
-        trace("producer sent messages for topics %s to broker %d on %s:%d"
+        trace("Producer sent messages for topics %s to broker %d on %s:%d"
           .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
-        var msgIdx = -1
-        val errors = new ListBuffer[(String, Int)]
-        for( topic <- topicData; partition <- topic.partitionDataArray ) {
-          msgIdx += 1
-          if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
-            errors.append((topic.topic, partition.partition))
-        }
-        errors
+        if (response.status.size != producerRequest.data.size)
+          throw new KafkaException("Incomplete response (%s) for producer request (%s)"
+                                           .format(response, producerRequest))
+        response.status.filter(_._2._1 != ErrorMapping.NoError).toSeq
+          .map(partitionStatus => partitionStatus._1)
       } catch {
         case e =>
           warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e)
diff --git core/src/main/scala/kafka/server/AbstractFetcherThread.scala core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1a11d96..05a0f06 100644
--- core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -83,34 +83,32 @@ abstract class  AbstractFetcherThread(name: String, sourceBroker: Broker, socket
     if (response != null) {
       // process fetched data
       fetchMapLock synchronized {
-        for ( topicData <- response.data ) {
-          for ( partitionData <- topicData.partitionDataArray) {
-            val topic = topicData.topic
-            val partitionId = partitionData.partition
-            val key = (topic, partitionId)
-            val currentOffset = fetchMap.get(key)
-            if (currentOffset.isDefined) {
-              partitionData.error match {
-                case ErrorMapping.NoError =>
-                  processPartitionData(topic, currentOffset.get, partitionData)
-                  val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
-                  fetchMap.put(key, newOffset)
-                case ErrorMapping.OffsetOutOfRangeCode =>
-                  val newOffset = handleOffsetOutOfRange(topic, partitionId)
-                  fetchMap.put(key, newOffset)
-                  warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
-                               .format(currentOffset.get, topic, partitionId, newOffset))
-                case _ =>
-                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
-                        ErrorMapping.exceptionFor(partitionData.error))
-                  partitionsWithError += key
-                  fetchMap.remove(key)
-              }
+        response.data.foreach(data => {
+          val (key, partitionData) = (data._1, data._2)
+          val (topic, partitionId) = key
+          val currentOffset = fetchMap.get(key)
+          if (currentOffset.isDefined) {
+            partitionData.error match {
+              case ErrorMapping.NoError =>
+                processPartitionData(topic, currentOffset.get, partitionData)
+                val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                fetchMap.put(key, newOffset)
+              case ErrorMapping.OffsetOutOfRangeCode =>
+                val newOffset = handleOffsetOutOfRange(topic, partitionId)
+                fetchMap.put(key, newOffset)
+                warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+                             .format(currentOffset.get, topic, partitionId, newOffset))
+              case _ =>
+                error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+                      ErrorMapping.exceptionFor(partitionData.error))
+                partitionsWithError += key
+                fetchMap.remove(key)
             }
           }
-        }
+        })
       }
     }
+
     if (partitionsWithError.size > 0) {
       debug("handling partitions with error for %s".format(partitionsWithError))
       handlePartitionsWithErrors(partitionsWithError)
diff --git core/src/main/scala/kafka/server/KafkaApis.scala core/src/main/scala/kafka/server/KafkaApis.scala
index d9cd6c6..9d4565e 100644
--- core/src/main/scala/kafka/server/KafkaApis.scala
+++ core/src/main/scala/kafka/server/KafkaApis.scala
@@ -41,13 +41,12 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
 
-  private val metricsGroup = brokerId.toString
-  private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
-  private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+  private val producerRequestPurgatory = new ProducerRequestPurgatory
+  private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
 
   private val requestLogger = Logger.getLogger("kafka.request.logger")
-  this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
+  this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
   /**
    * Top-level method that handles all requests and multiplexes to the right api
@@ -94,18 +93,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   /**
-   * Check if the partitionDataArray from a produce request can unblock any
+   * Check if a partitionData from a produce request can unblock any
    * DelayedFetch requests.
    */
-  def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
-    var satisfied = new mutable.ArrayBuffer[DelayedFetch]
-    for(partitionData <- partitionDatas)
-      satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null)
-    trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size))
+  def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) {
+    val partition = partitionData.partition
+    val satisfied =  fetchRequestPurgatory.update(RequestKey(topic, partition), null)
+    trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
+
     // send any newly unblocked responses
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
-      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+      val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
 
       val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
       delayedRequestMetrics.recordDelayedFetchSatisfied(
@@ -125,28 +124,23 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling producer request " + request.toString)
     trace("Handling producer request " + request.toString)
 
-    val response = produceToLocalLog(produceRequest)
+    val localProduceResponse = produceToLocalLog(produceRequest)
     debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
-    
-    for (topicData <- produceRequest.data)
-      maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
-    
+
+    produceRequest.data.foreach(partitionAndData =>
+      maybeUnblockDelayedFetchRequests(partitionAndData._1._1, partitionAndData._2))
+
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
         produceRequest.data.size <= 0)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse)))
     else {
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val producerRequestKeys = produceRequest.data.flatMap(topicData => {
-        val topic = topicData.topic
-        topicData.partitionDataArray.map(partitionData => {
-          RequestKey(topic, partitionData.partition)
-        })
-      })
+      val producerRequestKeys = produceRequest.data.keys.map(
+        topicAndPartition => RequestKey(topicAndPartition._1, topicAndPartition._2)).toSeq
 
       val delayedProduce = new DelayedProduce(
-        producerRequestKeys, request,
-        response.errors, response.offsets,
+        producerRequestKeys, request, localProduceResponse,
         produceRequest, produceRequest.ackTimeoutMs.toLong)
       producerRequestPurgatory.watch(delayedProduce)
 
@@ -170,43 +164,44 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
     trace("Produce [%s] to local log ".format(request.toString))
-    val requestSize = request.topicPartitionCount
-    val errors = new Array[Short](requestSize)
-    val offsets = new Array[Long](requestSize)
-
-    var msgIndex = -1
-    for(topicData <- request.data) {
-      for(partitionData <- topicData.partitionDataArray) {
-        msgIndex += 1
-        BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
-        BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
-        try {
-          val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
-          val log = localReplica.log.get
-          log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
-          // we may need to increment high watermark since ISR could be down to 1
-          localReplica.partition.maybeIncrementLeaderHW(localReplica)
-          offsets(msgIndex) = log.logEndOffset
-          errors(msgIndex) = ErrorMapping.NoError.toShort
-          trace("%d bytes written to logs, nextAppendOffset = %d"
-            .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
-        } catch {
-          case e =>
-            BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
-            BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-            error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
-            e match {
-              case _: IOException =>
-                fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
-                System.exit(1)
-              case _ =>
-                errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
-                offsets(msgIndex) = -1
-            }
-        }
+
+    val localErrorsAndOffsets = request.data.map (topicAndPartitionData => {
+      val (topic, partitionData) = (topicAndPartitionData._1._1, topicAndPartitionData._2)
+      BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(partitionData.messages.sizeInBytes)
+      BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
+
+      val messagesSize = partitionData.messages.sizeInBytes
+      BrokerTopicStat.getBrokerTopicStat(topic).recordBytesIn(messagesSize)
+      BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(messagesSize)
+      try {
+        val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition)
+        val log = localReplica.log.get
+        log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+        // we may need to increment high watermark since ISR could be down to 1
+        localReplica.partition.maybeIncrementLeaderHW(localReplica)
+        val (error, offset) = (ErrorMapping.NoError.toShort, log.logEndOffset)
+        trace("%d bytes written to logs, nextAppendOffset = %d"
+                      .format(partitionData.messages.sizeInBytes, offset))
+        ((topic, partitionData.partition), (error, offset))
+      } catch {
+        case e: Throwable =>
+          BrokerTopicStat.getBrokerTopicStat(topic).recordFailedProduceRequest
+          BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+          error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e)
+          e match {
+            case _: IOException =>
+              fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+              // compiler requires scala.sys.exit (not System.exit).
+              exit(1)
+            case _ =>
+              val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort, -1L)
+              ((topic, partitionData.partition), (error, offset))
+          }
       }
     }
-    new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
+    )
+
+    ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets)
   }
 
   /**
@@ -218,25 +213,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       requestLogger.trace("Handling fetch request " + fetchRequest.toString)
     trace("Handling fetch request " + fetchRequest.toString)
 
-    // validate the request
-    try {
-      fetchRequest.validate()
-    } catch {
-      case e:FetchRequestFormatException =>
-        val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
-        val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response))
-        requestChannel.sendResponse(channelResponse)
-    }
-
     if(fetchRequest.replicaId != FetchRequest.NonFollowerId) {
       maybeUpdatePartitionHW(fetchRequest)
       // after updating HW, some delayed produce requests may be unblocked
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
-      fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
-        topicOffsetInfo.partitions.foreach(partition => {
-          val key = RequestKey(topicOffsetInfo.topic, partition)
-          satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
-        })
+      fetchRequest.requestInfo.foreach(partitionOffsetInfo => {
+        val key = RequestKey(partitionOffsetInfo._1._1, partitionOffsetInfo._1._2)
+        satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
       })
       debug("Replica %d fetch unblocked %d producer requests."
         .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
@@ -250,13 +233,13 @@ class KafkaApis(val requestChannel: RequestChannel,
        fetchRequest.numPartitions <= 0) {
       val topicData = readMessageSets(fetchRequest)
       debug("Returning fetch response %s for fetch request with correlation id %d".format(
-        topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
-      val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
+        topicData.values.map(_.error).mkString(","), fetchRequest.correlationId))
+      val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
       requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
     } else {
       debug("Putting fetch request into purgatory")
       // create a list of (topic, partition) pairs to use as keys for this delayed request
-      val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _)))
+      val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(k => RequestKey(k._1, k._2))
       val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
       fetchRequestPurgatory.watch(delayedFetch)
     }
@@ -266,59 +249,52 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Calculate the number of available bytes for the given fetch request
    */
   private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
-    var totalBytes = 0L
-    for(offsetDetail <- fetchRequest.offsetInfo) {
-      for(i <- 0 until offsetDetail.partitions.size) {
+    val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => {
+      folded +
+      {
+        val (topic, partition, offset, fetchSize) = (curr._1._1, curr._1._2, curr._2.offset, curr._2.fetchSize)
         try {
-          val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i))
+          val localReplica = replicaManager.getReplica(topic, partition)
           val available = localReplica match {
-            case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i))
+            case Some(replica) => max(0, replica.log.get.logEndOffset - offset)
             case None => 0
           }
-          totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+          math.min(fetchSize, available)
         } catch {
-          case e: UnknownTopicOrPartitionException =>
-            info("Invalid partition %d in fetch request from client %d."
-              .format(offsetDetail.partitions(i), fetchRequest.clientId))
+          case e: UnknownTopicOrPartitionException => {
+            info("Invalid partition %d in fetch request from client %s.".format(partition, fetchRequest.clientId))
+            0
+          }
         }
       }
-    }
+    })
     trace(totalBytes + " available bytes for fetch request.")
     totalBytes
   }
 
   private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
-    val offsets = fetchRequest.offsetInfo
-    debug("Act on update partition HW, check offset detail: %s ".format(offsets))
-    for(offsetDetail <- offsets) {
-      val topic = offsetDetail.topic
-      val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
-      for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
-        replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
-      }
-    }
+    debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
+    fetchRequest.requestInfo.foreach(info => {
+      val (topic, partition, offset) = (info._1._1, info._1._2, info._2.offset)
+      replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
+    })
   }
 
   /**
-   * Read from all the offset details given and produce an array of topic datas
+   * Read from all the offset details given and return a map of
+   * (topic, partition) -> PartitionData
    */
-  private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
-    val offsets = fetchRequest.offsetInfo
-    val fetchedData = new mutable.ArrayBuffer[TopicData]()
-
-    for(offsetDetail <- offsets) {
-      val info = new mutable.ArrayBuffer[PartitionData]()
-      val topic = offsetDetail.topic
-      val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
-      for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
-        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
+  private def readMessageSets(fetchRequest: FetchRequest) = {
+    fetchRequest.requestInfo.map {
+      case ((topic, partition), fetchInfo) => {
+        val partitionData = readMessageSet(topic, partition, fetchInfo.offset, fetchInfo.fetchSize) match {
           case Left(err) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
             fetchRequest.replicaId match {
-              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case -1 => new PartitionData(partition, err, fetchInfo.offset, -1L, MessageSet.Empty)
               case _ =>
-                new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+                new PartitionData(partition, err, fetchInfo.offset, -1L, MessageSet.Empty)
             }
           case Right(messages) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
@@ -326,17 +302,15 @@ class KafkaApis(val requestChannel: RequestChannel,
             val leaderReplica = replicaManager.getReplica(topic, partition).get
             if (fetchRequest.replicaId != FetchRequest.NonFollowerId) {
               debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
-                .format(topic, partition, fetchRequest.replicaId))
+                            .format(topic, partition, fetchRequest.replicaId))
               debug("Leader returning %d messages for topic %s partition %d to follower %d"
-                .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+                            .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
             }
-            new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages)
+            new PartitionData(partition, ErrorMapping.NoError, fetchInfo.offset, leaderReplica.highWatermark, messages)
         }
-        info.append(partitionInfo)
+        ((topic, partition), partitionData)
       }
-      fetchedData.append(new TopicData(topic, info.toArray))
     }
-    fetchedData.toArray
   }
 
   /**
@@ -447,6 +421,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   private [kafka] case class RequestKey(topic: String, partition: Int)
           extends MetricKey {
     override def keyLabel = "%s-%d".format(topic, partition)
+    def topicPartition = (topic, partition)
   }
   /**
    * A delayed fetch request
@@ -457,9 +432,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * A holding pen for fetch requests waiting to be satisfied
    */
-  class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
+  class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
 
-    this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
 
 
     /**
@@ -473,7 +448,7 @@ class KafkaApis(val requestChannel: RequestChannel,
      */
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
-      val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+      val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
       val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId
       delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
@@ -482,48 +457,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   class DelayedProduce(keys: Seq[RequestKey],
                        request: RequestChannel.Request,
-                       localErrors: Array[Short],
-                       requiredOffsets: Array[Long],
+                       localProduceResponse: ProducerResponse,
                        val produce: ProducerRequest,
                        delayMs: Long)
           extends DelayedRequest(keys, request, delayMs) with Logging {
 
+    private val initialErrorsAndOffsets = localProduceResponse.status
     /**
      * Map of (topic, partition) -> partition status
      * The values in this map don't need to be synchronized since updates to the
      * values are effectively synchronized by the ProducerRequestPurgatory's
      * update method
      */
-    private [kafka] val partitionStatus = keys.map(key => {
-      val keyIndex = keys.indexOf(key)
+    private [kafka] val partitionStatus = keys.map(requestKey => {
+      val errorAndOffset = initialErrorsAndOffsets(requestKey.topicPartition)
       // if there was an error in writing to the local replica's log, then don't
       // wait for acks on this partition
-      val acksPending =
-        if (localErrors(keyIndex) == ErrorMapping.NoError) {
+      val (acksPending, error, offset) =
+        if (errorAndOffset._1 == ErrorMapping.NoError) {
           // Timeout error state will be cleared when requiredAcks are received
-          localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode
-          true
+          (true, ErrorMapping.RequestTimedOutCode, errorAndOffset._2)
         }
-        else
-          false
+        else (false, errorAndOffset._1, errorAndOffset._2)
 
-      val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex))
-      trace("Initial partition status for %s = %s".format(key, initialStatus))
-      (key, initialStatus)
+      val initialStatus = new PartitionStatus(acksPending, error, offset)
+      trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
+      (requestKey, initialStatus)
     }).toMap
 
-
     def respond() {
-      val errorsAndOffsets: (List[Short], List[Long]) = (
-        keys.foldRight
-          ((List[Short](), List[Long]()))
-          ((key: RequestKey, result: (List[Short], List[Long])) => {
-            val status = partitionStatus(key)
-            (status.error :: result._1, status.requiredOffset :: result._2)
-          })
-        )
-      val response = new ProducerResponse(produce.versionId, produce.correlationId,
-                                          errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+      
+      val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
+        status => {
+          val (topic, partition) = status._1
+          val pstat = partitionStatus(RequestKey(topic, partition))
+          (status._1, (pstat.error, pstat.requiredOffset))
+        })
+      
+      val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets)
 
       requestChannel.sendResponse(new RequestChannel.Response(
         request, new BoundedByteBufferSend(response)))
@@ -559,12 +530,11 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchPartitionStatus.error = ErrorMapping.NoError
         }
         if (!fetchPartitionStatus.acksPending) {
-          val topicData = produce.data.find(_.topic == topic).get
-          val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
+          val partitionData = produce.data((topic, partitionId))
           delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
                                                                  durationNs,
-                                                                 partitionData.sizeInBytes)
-          maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
+                                                                 partitionData.messages.sizeInBytes.toInt)
+          maybeUnblockDelayedFetchRequests(topic, partitionData)
         }
       }
 
@@ -593,9 +563,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * A holding pen for produce requests waiting to be satisfied.
    */
-  private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
+  private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
 
-    this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
+    this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
 
     protected def checkSatisfied(followerFetchRequestKey: RequestKey,
                                  delayedProduce: DelayedProduce) =
@@ -705,16 +675,15 @@ class KafkaApis(val requestChannel: RequestChannel,
         else aggregateNonFollowerFetchRequestMetrics
       metrics.throughputMeter.mark(response.sizeInBytes)
 
-      response.topicMap.foreach(topicAndData => {
-        val topic = topicAndData._1
-        topicAndData._2.partitionDataArray.foreach(partitionData => {
-          val key = RequestKey(topic, partitionData.partition)
-          val keyMetrics = if (forFollower)
-            followerFetchRequestMetricsForKey.getAndMaybePut(key)
-          else
-            nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
-          keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
-        })
+      response.data.foreach(topicAndData => {
+        val topic = topicAndData._1._1
+        val partitionData = topicAndData._2
+        val key = RequestKey(topic, partitionData.partition)
+        val keyMetrics = if (forFollower)
+          followerFetchRequestMetricsForKey.getAndMaybePut(key)
+        else
+          nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
+        keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
       })
     }
 
diff --git core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 1a6be9f..8810216 100644
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -19,7 +19,7 @@ package kafka.integration
 
 import java.nio.ByteBuffer
 import junit.framework.Assert._
-import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
+import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
 import kafka.server.{KafkaRequestHandler, KafkaConfig}
 import java.util.Properties
 import kafka.producer.{ProducerData, Producer, ProducerConfig}
@@ -31,7 +31,7 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import scala.collection._
-import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
 import kafka.admin.{AdminUtils, CreateTopicCommand}
 
 /**
@@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
     assertEquals(request, deserializedRequest)
   }
 
-  def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() {
-    val offsets = Array(
-      new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
-      new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
-    )
-    val request = new FetchRequest(offsetInfo = offsets)
-    try {
-      consumer.fetch(request)
-      fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
-    } catch {
-      case e: FetchRequestFormatException => "success"
-    }
-  }
-
   def testEmptyFetchRequest() {
-    val offsets = Array[OffsetDetail]()
-    val request = new FetchRequest(offsetInfo = offsets)
+    val partitionRequests = immutable.Map[(String, Int), PartitionFetchInfo]()
+    val request = new FetchRequest(requestInfo = partitionRequests)
     val fetched = consumer.fetch(request)
-    assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0)
+    assertTrue(!fetched.hasError && fetched.data.size == 0)
   }
 
   def testDefaultEncoderProducerAndFetch() {
diff --git core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
index 16057dc..e2c506c 100644
--- core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
+++ core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.network;
+package kafka.network
 
 import org.junit._
 import org.scalatest.junit.JUnitSuite
@@ -27,28 +27,42 @@ import kafka.cluster.Broker
 import kafka.common.ErrorMapping
 import collection.mutable._
 
+
 object RpcDataSerializationTestUtils{
   private val topic1 = "test1"
   private val topic2 = "test2"
-  private val leader1 = 0;
+  private val leader1 = 0
   private val isr1 = List(0, 1, 2)
-  private val leader2 = 0;
+  private val leader2 = 0
   private val isr2 = List(0, 2, 3)
   private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
   private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
   private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
   private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
   private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3)
-  private val topicData1 = new TopicData(topic1, partitionDataArray)
-  private val topicData2 = new TopicData(topic2, partitionDataArray)
-  private val topicDataArray = Array(topicData1, topicData2)
-  private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
-  private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
-  private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2)
-  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
-  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
+
+  private val topicData = {
+    val groupedData = Array(topic1, topic2).flatMap(topic =>
+      partitionDataArray.map(partitionData =>
+        ((topic, partitionData.partition), partitionData)))
+    collection.immutable.Map(groupedData:_*)
+  }
+
+  private val requestInfos = collection.immutable.Map(
+    (topic1, 0) -> PartitionFetchInfo(1000, 100),
+    (topic1, 1) -> PartitionFetchInfo(2000, 100),
+    (topic1, 2) -> PartitionFetchInfo(3000, 100),
+    (topic1, 3) -> PartitionFetchInfo(4000, 100),
+    (topic2, 0) -> PartitionFetchInfo(1000, 100),
+    (topic2, 1) -> PartitionFetchInfo(2000, 100),
+    (topic2, 2) -> PartitionFetchInfo(3000, 100),
+    (topic2, 3) -> PartitionFetchInfo(4000, 100)
+  )
+
+  private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+  private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
   private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
   private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
   private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{
   }
 
   def createTestProducerRequest: ProducerRequest = {
-    new ProducerRequest(1, "client 1", 0, 1000, topicDataArray)
+    new ProducerRequest(1, "client 1", 0, 1000, topicData)
   }
 
-  def createTestProducerResponse: ProducerResponse = {
-    new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0)
-  }
+  def createTestProducerResponse: ProducerResponse =
+    ProducerResponse(1, 1, Map(
+      (topic1, 0) -> (0.toShort, 10001),
+      (topic2, 0) -> (0.toShort, 20001)
+    ))
 
   def createTestFetchRequest: FetchRequest = {
-    new FetchRequest(offsetInfo = offsetDetailSeq)
+    new FetchRequest(requestInfo = requestInfos)
   }
 
   def createTestFetchResponse: FetchResponse = {
-    new FetchResponse(1, 1, topicDataArray)
+    FetchResponse(1, 1, topicData)
   }
 
   def createTestOffsetRequest: OffsetRequest = {
diff --git core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index b21712d..64d441c 100644
--- core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -409,9 +409,12 @@ class AsyncProducerTest extends JUnit3Suite {
     // On the third try for partition 0, let it succeed.
     val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
     val response1 =
-      new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
+      ProducerResponse(ProducerRequest.CurrentVersion, 0,
+                       Map((("topic1", 0), (ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
+                           (("topic1", 1), (ErrorMapping.NoError, 0L))))
     val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
-    val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
+    val response2 = ProducerResponse(
+      ProducerRequest.CurrentVersion, 0, Map((("topic1", 0), (ErrorMapping.NoError, 0L))))
     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
     EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
     EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
diff --git core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index 068bf7c..7d1383b 100644
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -28,7 +28,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.TopicData
+import kafka.api.PartitionData
 
 class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
   private var messageBytes =  new Array[Byte](2);
@@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
     val ack = SyncProducerConfig.DefaultRequiredAcks
-    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+    val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[(String, Int), PartitionData]())
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val response = producer.send(emptyRequest)
-    Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0)
+    Assert.assertTrue(!response.hasError && response.status.size == 0)
   }
 
   @Test
@@ -152,10 +152,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
 
     Assert.assertNotNull(response)
     Assert.assertEquals(request.correlationId, response.correlationId)
-    Assert.assertEquals(response.errors.length, response.offsets.length)
-    Assert.assertEquals(3, response.errors.length)
-    response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _))
-    response.offsets.foreach(Assert.assertEquals(-1L, _))
+    Assert.assertEquals(3, response.status.size)
+    response.status.foreach(partitionStatus => {
+      val (error,  offset) = partitionStatus._2
+      Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error)
+      Assert.assertEquals(-1L, offset)
+    })
 
     // #2 - test that we get correct offsets when partition is owned by broker
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
@@ -166,18 +168,17 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)
     Assert.assertEquals(request.correlationId, response2.correlationId)
-    Assert.assertEquals(response2.errors.length, response2.offsets.length)
-    Assert.assertEquals(3, response2.errors.length)
+    Assert.assertEquals(3, response2.status.size)
 
     // the first and last message should have been accepted by broker
-    Assert.assertEquals(0, response2.errors(0))
-    Assert.assertEquals(0, response2.errors(2))
-    Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
-    Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
+    Assert.assertEquals(0, response2.status("topic1", 0)._1)
+    Assert.assertEquals(0, response2.status("topic3", 0)._1)
+    Assert.assertEquals(messages.sizeInBytes, response2.status("topic1", 0)._2)
+    Assert.assertEquals(messages.sizeInBytes, response2.status("topic3", 0)._2)
 
     // the middle message should have been rejected because broker doesn't lead partition
-    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1))
-    Assert.assertEquals(-1, response2.offsets(1))
+    Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.status("topic2", 0)._1)
+    Assert.assertEquals(-1, response2.status("topic2", 0)._2)
   }
 
   @Test
diff --git core/src/test/scala/unit/kafka/utils/TestUtils.scala core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bbaa7e8..b0012c5 100644
--- core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -33,9 +33,8 @@ import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
-import kafka.common.ErrorMapping
 import kafka.api._
-import collection.mutable.{Map, Set}
+import collection.mutable.Map
 import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 
 
@@ -366,8 +365,10 @@ object TestUtils extends Logging {
     val correlationId = SyncProducerConfig.DefaultCorrelationId
     val clientId = SyncProducerConfig.DefaultClientId
     val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
-    val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
-    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray)
+    val data = topics.flatMap(topic =>
+      partitions.map(partition => ((topic,  partition), new PartitionData(partition, message)))
+    )
+    new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
   }
 
   def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
@@ -382,10 +383,7 @@ object TestUtils extends Logging {
     val clientId = "test"
     val requiredAcks: Short = 0
     val ackTimeoutMs = 0
-    var data = new Array[TopicData](1)
-    var partitionData = new Array[PartitionData](1)
-    partitionData(0) = new PartitionData(partition,message.underlying)
-    data(0) = new TopicData(topic,partitionData)
+    val data = Map(("topic", 0) -> new PartitionData(partition,message.underlying))
     val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
     pr
   }
