Index: core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala	(working copy)
@@ -189,8 +189,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -206,9 +205,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics) {
-          response.messageSet(topic, -1).iterator
-        }
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
@@ -256,8 +253,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, partition) <- topics)
-          response.messageSet(topic, partition).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid offset")
       } catch {
         case e: OffsetOutOfRangeException => "this is good"
@@ -273,8 +269,7 @@
       try {
         val request = builder.build()
         val response = consumer.fetch(request)
-        for( (topic, _) <- topics)
-          response.messageSet(topic, -1).iterator
+        response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
         fail("Expected exception when fetching message with invalid partition")
       } catch {
         case e: UnknownTopicOrPartitionException => "this is good"
Index: core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala	(working copy)
@@ -25,7 +25,7 @@
 import scala.collection._
 import kafka.producer.ProducerData
 import kafka.utils.TestUtils
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{ErrorMapping, KafkaException, OffsetOutOfRangeException}
 
 /**
  * End to end tests of the primitive apis against a local server
@@ -72,7 +72,7 @@
     // send an invalid offset
     try {
       val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
-      fetchedWithError.messageSet(topic, 0).iterator
+      fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
       fail("Expected an OffsetOutOfRangeException exception to be thrown")
     } catch {
       case e: OffsetOutOfRangeException => 
@@ -109,9 +109,9 @@
 
       val request = builder.build()
       val responses = consumer.fetch(request)
-      for( (topic, offset) <- topicOffsets ) {
+      for(topicData <- responses.data) {
         try {
-          responses.messageSet(topic, offset).iterator
+          topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
           fail("Expected an OffsetOutOfRangeException exception to be thrown")
         } catch {
           case e: OffsetOutOfRangeException =>
Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(revision 1379645)
+++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -32,9 +32,9 @@
   def testValidBytes() {
     val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
+                                                                         new Message("there".getBytes())))
     val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
+    buffer.put(messageList.buffer)
     buffer.putShort(4)
     val messageListPlus = new ByteBufferMessageSet(buffer)
     assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
@@ -46,17 +46,17 @@
                                                messages = getMessageList(new Message("hello".getBytes()),
                                                                          new Message("there".getBytes())))
     val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
-    buffer.put(messageList.getBuffer)
+    buffer.put(messageList.buffer)
     buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
+    val messageListPlus = new ByteBufferMessageSet(buffer, 0)
     assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
   }
 
   @Test
   def testEquals() {
     val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
-                                            messages = getMessageList(new Message("hello".getBytes()),
-                                                                      new Message("there".getBytes())))
+                                               messages = getMessageList(new Message("hello".getBytes()),
+                                                                         new Message("there".getBytes())))
     val moreMessages = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
                                                 messages = getMessageList(new Message("hello".getBytes()),
                                                                           new Message("there".getBytes())))
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -33,27 +33,19 @@
  * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
  * 
  */
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
+class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet with Logging {
   private var shallowValidByteCount = -1L
   if(sizeInBytes > Int.MaxValue)
     throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
-    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L)
   }
 
   def this(messages: Message*) {
     this(NoCompressionCodec, messages: _*)
   }
 
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
   def getSerialized(): ByteBuffer = buffer
 
   def validBytes: Long = shallowValidBytes
@@ -96,7 +88,6 @@
 
   /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
   private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
-    ErrorMapping.maybeThrowException(errorCode)
     new IteratorTemplate[MessageAndOffset] {
       var topIter = buffer.slice()
       var currValidBytes = initialOffset
@@ -192,12 +183,11 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
 }
Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
===================================================================
--- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(revision 1379645)
+++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala	(working copy)
@@ -68,8 +68,7 @@
    *  add an empty message with the exception to the queue so that client can see the error
    */
   def enqueueError(e: Throwable, fetchOffset: Long) = {
-    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
-      errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+    val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0)
     chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
   }
 
Index: core/src/main/scala/kafka/javaapi/Implicits.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/Implicits.scala	(revision 1379645)
+++ core/src/main/scala/kafka/javaapi/Implicits.scala	(working copy)
@@ -24,8 +24,7 @@
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
      kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
-                                                   messageSet.getErrorCode)
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.initialOffset)
   }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(revision 1379645)
+++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(working copy)
@@ -20,17 +20,12 @@
 import kafka.common.ErrorMapping
 import kafka.message._
 
-class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
-  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
-                                                                                              initialOffset,
-                                                                                              errorCode)
-  def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+class ByteBufferMessageSet(val buffer: ByteBuffer, val initialOffset: Long = 0L) extends MessageSet {
+  val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset)
+  def this(buffer: ByteBuffer) = this(buffer, 0L)
 
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
-    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
-         0L, ErrorMapping.NoError)
+    this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*), 0L)
   }
 
   def this(messages: java.util.List[Message]) {
@@ -41,12 +36,6 @@
 
   def serialized():ByteBuffer = underlying.getSerialized()
 
-  def getInitialOffset = initialOffset
-
-  def getBuffer = buffer
-
-  def getErrorCode = errorCode
-
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
     val underlyingIterator = underlying.iterator
     override def hasNext(): Boolean = {
@@ -67,13 +56,11 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) && buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
-
 }
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1379645)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -247,7 +247,7 @@
     logStats.recordAppendedMessages(numberOfMessages)
 
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
-    val validByteBuffer = messages.getBuffer.duplicate()
+    val validByteBuffer = messages.buffer.duplicate()
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
Index: core/src/main/scala/kafka/api/FetchResponse.scala
===================================================================
--- core/src/main/scala/kafka/api/FetchResponse.scala	(revision 1379645)
+++ core/src/main/scala/kafka/api/FetchResponse.scala	(working copy)
@@ -34,7 +34,7 @@
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset, error))
+    new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
   }
 }
 
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1379645)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -136,7 +136,6 @@
 
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
-                if ( hasError(msgSet)) return false;
                 _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
@@ -249,36 +248,6 @@
         return range;
     }
     
-    /**
-     * Called by the default implementation of {@link #map} to check error code
-     * to determine whether to continue.
-     */
-    protected boolean hasError(ByteBufferMessageSet messages)
-            throws IOException {
-        short errorCode = messages.getErrorCode();
-        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
-            /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
-               Kafka server may delete old files from time to time */
-            System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
-
-            if (_retry >= MAX_RETRY_TIME)  return true;
-            _retry++;
-            // get the current offset range
-            _offsetRange = getOffsetRange();
-            _offset =  _offsetRange[0];
-            return false;
-        } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
-            throw new IOException(_input + " : wrong partition");
-        } else if (errorCode != ErrorMapping.NoError()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " error:" + errorCode);
-        } else
-            return false;
-    }
-    
     public static int getClientBufferSize(Props props) throws Exception {
         return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
     }
