Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -189,8 +189,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -206,9 +205,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics) {
-          response.messageSet(topic, -1).iterator
-        }
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
@@ -256,8 +253,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -273,8 +269,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, _) <- topics)
-          response.messageSet(topic, -1).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -25,7 +25,7 @@
 import scala.collection._
 import kafka.producer.ProducerData
 import kafka.utils.TestUtils
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -72,7 +72,7 @@
     // send an invalid offset
     try {
       val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
-      fetchedWithError.messageSet(topic, 0).iterator
+      fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
       fail("Expected an OffsetOutOfRangeException exception to be thrown")
     } catch {
       case e: OffsetOutOfRangeException => 
@@ -109,9 +109,9 @@
 
       val request = builder.build()
       val responses = consumer.fetch(request)
-      for( (topic, offset) <- topicOffsets ) {
+      for(topicData <- responses.data) {
         try {
-          responses.messageSet(topic, offset).iterator
+          topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
           fail("Expected an OffsetOutOfRangeException exception to be thrown")
         } catch {
           case e: OffsetOutOfRangeException =>
Index: core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -33,7 +33,7 @@
     // create a ByteBufferMessageSet that doesn't contain a full message
     // iterating it should get an InvalidMessageSizeException
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
-    val buffer = messages.getSerialized().slice
+    val buffer = messages.buffer.slice
     buffer.limit(10)
     val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
     try {
@@ -51,7 +51,7 @@
     {
       val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
       val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
-      buffer.put(messages.getSerialized())
+      buffer.put(messages.buffer)
       buffer.putShort(4)
       val messagesPlus = new ByteBufferMessageSet(buffer)
       assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
@@ -65,6 +65,18 @@
   }
 
   @Test
+  def testValidBytesWithCompression() {
+    {
+      val messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+      val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+      buffer.put(messages.buffer)
+      buffer.putShort(4)
+      val messagesPlus = new ByteBufferMessageSet(buffer)
+      assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+    }
+  }
+
+  @Test
   def testEquals() {
     var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
     var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
@@ -93,7 +105,7 @@
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
 
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
@@ -107,7 +119,7 @@
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.buffer.limit)
 
       verifyShallowIterator(messageSet)
     }
@@ -117,16 +129,16 @@
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
-      buffer.put(emptyMessageSet.getSerialized())
-      buffer.put(regularMessgeSet.getSerialized())
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      buffer.put(emptyMessageSet.buffer)
+      buffer.put(regularMessgeSet.buffer)
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
 
       //make sure shallow iterator is the same as deep iterator
       TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
@@ -138,16 +150,16 @@
       val emptyMessageList : List[Message] = Nil
       val emptyMessageSet = new ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
       val regularMessgeSet = new ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
-      val buffer = ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + regularMessgeSet.getSerialized().limit)
-      buffer.put(emptyMessageSet.getSerialized())
-      buffer.put(regularMessgeSet.getSerialized())
+      val buffer = ByteBuffer.allocate(emptyMessageSet.buffer.limit + regularMessgeSet.buffer.limit)
+      buffer.put(emptyMessageSet.buffer)
+      buffer.put(regularMessgeSet.buffer)
       buffer.rewind
       val mixedMessageSet = new ByteBufferMessageSet(buffer)
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure ByteBufferMessageSet is re-iterable.
       TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
       //make sure the last offset after iteration is correct
-      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
+      assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.buffer.limit)
 
       verifyShallowIterator(mixedMessageSet)
     }
Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -29,34 +29,10 @@
     new ByteBufferMessageSet(compressed, getMessageList(messages: _*))
   
   @Test
-  def testValidBytes() {
+  def testEquals() {
     val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
-    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
-    buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer)
-    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
-  }
-
-  @Test
-  def testValidBytesWithCompression () {
-    val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
-                                               messages = getMessageList(new Message("hello".getBytes()),
                                                                          new Message("there".getBytes())))
-    val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
-    buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
-    assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
-  }
-
-  @Test
-  def testEquals() {
-    val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                            messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
     val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                 messages = getMessageList(new Message("hello".getBytes()),
                                                                           new Message("there".getBytes())))
Index: core/src/main/scala/kafka/message/FileMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/FileMessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/message/FileMessageSet.scala	(working copy)
@@ -40,8 +40,6 @@
   
   private val setSize = new AtomicLong()
 
-  def getSerialized(): ByteBuffer = throw new java.lang.UnsupportedOperationException()
-
   if(mutable) {
     if(limit < Long.MaxValue || offset > 0)
       throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
Index: core/src/main/scala/kafka/message/MessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/MessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/message/MessageSet.scala	(working copy)
@@ -110,10 +110,5 @@
       if(!messageAndOffset.message.isValid)
         throw new InvalidMessageException
   }
-  
-  /**
-   * Used to allow children to have serialization on implementation
-   */
-  def getSerialized(): ByteBuffer
 
 }
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -21,7 +21,7 @@
 import java.nio.ByteBuffer
 import java.nio.channels._
 import kafka.utils.IteratorTemplate
-import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException}
 
 /**
  * A sequence of messages stored in a byte buffer
@@ -33,29 +33,19 @@
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  * 
  */
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
+class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging {
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L)
   }
 
   def this(messages: Message*) {
     this(NoCompressionCodec, messages: _*)
   }
 
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
-  def getSerialized(): ByteBuffer = buffer
-
   def validBytes: Long = shallowValidBytes
 
   private def shallowValidBytes: Long = {
@@ -96,7 +86,6 @@
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
-    ErrorMapping.maybeThrowException(errorCode)
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
       var currValidBytes = initialOffset
@@ -192,12 +181,18 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
+  override def hashCode: Int = {
+    var hash = 17
+    hash = hash * 31 + buffer.hashCode
+    hash = hash * 31 + initialOffset.hashCode
+    hash
+  }
+
 }
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1379645)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -68,8 +68,7 @@
    *  add an empty message with the exception to the queue so that client can see the error
    */
   def enqueueError(e: Throwable, fetchOffset: Long) = {
-    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
-      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0)
     chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
   }
 
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1379645)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -311,26 +311,31 @@
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
-        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
-          case Left(err) =>
+        val partitionInfo =
+          try {
+          val messages = readMessageSet(topic, partition, offset, fetchSize)
+          BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
+          BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
+          val leaderReplica = replicaManager.getReplica(topic, partition).get
+          if (fetchRequest.replicaId != FetchRequest.NonFollowerId) {
+            debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
+              .format(topic, partition, fetchRequest.replicaId))
+            debug("Leader returning %d messages for topic %s partition %d to follower %d"
+              .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+          }
+          new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages)
+        }
+        catch {
+          case e =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
+            error("error when processing request " + (topic, partition, offset, fetchSize), e)
             fetchRequest.replicaId match {
-              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
-              case _ =>
-                new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+              case -1 => new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+                                           offset, -1L, MessageSet.Empty)
+              case _ => new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
+                                          offset, -1L, MessageSet.Empty)
             }
-          case Right(messages) =>
-            BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            val leaderReplica = replicaManager.getReplica(topic, partition).get
-            if (fetchRequest.replicaId != FetchRequest.NonFollowerId) {
-              debug("Leader for topic [%s] partition [%d] received fetch request from follower [%d]"
-                .format(topic, partition, fetchRequest.replicaId))
-              debug("Leader returning %d messages for topic %s partition %d to follower %d"
-                .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
-            }
-            new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark, messages)
         }
         info.append(partitionInfo)
       }
@@ -342,20 +347,12 @@
   /**
    * Read from a single topic/partition at the given offset
    */
-  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = {
-    var response: Either[Short, MessageSet] = null
-    try {
-      // check if the current broker is the leader for the partitions
-      val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition)
-      trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
-      val log = localReplica.log.get
-      response = Right(log.read(offset, maxSize))
-    } catch {
-      case e =>
-        error("error when processing request " + (topic, partition, offset, maxSize), e)
-        response = Left(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }
-    response
+  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): MessageSet = {
+    // check if the current broker is the leader for the partitions
+    val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition)
+    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+    val log = localReplica.log.get
+    log.read(offset, maxSize)
   }
 
   /**
Index: core/src/main/scala/kafka/javaapi/Implicits.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/Implicits.scala	(revision 1379645)
+++ core/src/main/scala/kafka/javaapi/Implicits.scala	(working copy)
@@ -24,8 +24,7 @@
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
      kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
-                                                   messageSet.getErrorCode)
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
   }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(working copy)
@@ -17,20 +17,14 @@
 package kafka.javaapi.message
 
 import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
 import kafka.message._
 
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
-  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
-                                                                                              initialOffset,
-                                                                                              errorCode)
-  def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+class ByteBufferMessageSet(private val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
+  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
+  def this(buffer: ByteBuffer) = this(buffer, 0L)
 
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
-         0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
   }
 
   def this(messages: java.util.List[Message]) {
@@ -39,14 +33,6 @@
 
   def validBytes: Long = underlying.validBytes
 
-  def serialized():ByteBuffer = underlying.getSerialized()
-
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
     val underlyingIterator = underlying.iterator
     override def hasNext(): Boolean = {
@@ -67,13 +53,18 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
+  override def hashCode: Int = {
+    var hash = 17
+    hash = hash * 31 + buffer.hashCode
+    hash = hash * 31 + initialOffset.hashCode
+    hash
+  }
 
 }
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1379645)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -247,7 +247,7 @@
     logStats.recordAppendedMessages(numberOfMessages)
 
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
-    val validByteBuffer = messages.getBuffer.duplicate()
+    val validByteBuffer = messages.buffer.duplicate()
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
Index: core/src/main/scala/kafka/api/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchResponse.scala	(revision 1379645)
+++ core/src/main/scala/kafka/api/FetchResponse.scala	(working copy)
@@ -34,7 +34,7 @@
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
   }
 }
 
Index: core/src/main/scala/kafka/api/ProducerRequest.scala
===================================================================
--- core/src/main/scala/kafka/api/ProducerRequest.scala	(revision 1379645)
+++ core/src/main/scala/kafka/api/ProducerRequest.scala	(working copy)
@@ -76,9 +76,9 @@
       buffer.putInt(topicData.partitionDataArray.size) //the number of partitions
       for(partitionData <- topicData.partitionDataArray) {
         buffer.putInt(partitionData.partition)
-        buffer.putInt(partitionData.messages.getSerialized().limit)
-        buffer.put(partitionData.messages.getSerialized())
-        partitionData.messages.getSerialized().rewind
+        buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
+        buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
+        partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
       }
     }
   }
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1379645)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -24,7 +24,6 @@
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
-import kafka.common.ErrorMapping;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -136,7 +135,6 @@
 
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
-                if ( hasError(msgSet)) return false;
                 _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
@@ -249,36 +247,6 @@
         return range;
     }
     
-    /**
-     * Called by the default implementation of {@link #map} to check error code
-     * to determine whether to continue.
-     */
-    protected boolean hasError(ByteBufferMessageSet messages)
-            throws IOException {
-        short errorCode = messages.getErrorCode();
-        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
-            /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
-               Kafka server may delete old files from time to time */
-            System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
-
-            if (_retry >= MAX_RETRY_TIME)  return true;
-            _retry++;
-            // get the current offset range
-            _offsetRange = getOffsetRange();
-            _offset =  _offsetRange[0];
-            return false;
-        } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
-            throw new IOException(_input + " : wrong partition");
-        } else if (errorCode != ErrorMapping.NoError()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " error:" + errorCode);
-        } else
-            return false;
-    }
-    
     public static int getClientBufferSize(Props props) throws Exception {
         return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
     }
