Index: core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(revision 1374430)
+++ core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala	(working copy)
@@ -48,7 +48,7 @@
     val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
     buffer.put(messageList.getBuffer)
     buffer.putShort(4)
-    val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0)
+    val messageListPlus = new ByteBufferMessageSet(buffer, 0)
     assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
   }
 
Index: core/src/main/scala/kafka/javaapi/Implicits.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/Implicits.scala	(revision 1374430)
+++ core/src/main/scala/kafka/javaapi/Implicits.scala	(working copy)
@@ -24,8 +24,7 @@
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
      kafka.javaapi.message.ByteBufferMessageSet = {
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
-                                                   messageSet.getErrorCode)
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset)
   }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
Index: core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(revision 1374430)
+++ core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala	(working copy)
@@ -21,16 +21,14 @@
 import kafka.message._
 
 class ByteBufferMessageSet(private val buffer: ByteBuffer,
-                           private val initialOffset: Long = 0L,
-                           private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
+                           private val initialOffset: Long = 0L) extends MessageSet {
   val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
-                                                                                              initialOffset,
-                                                                                              errorCode)
-  def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
+                                                                                              initialOffset)
+  def this(buffer: ByteBuffer) = this(buffer, 0L)
 
   def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
     this(MessageSet.createByteBuffer(compressionCodec, scala.collection.JavaConversions.asBuffer(messages): _*),
-         0L, ErrorMapping.NoError)
+         0L)
   }
 
   def this(messages: java.util.List[Message]) {
@@ -45,8 +43,6 @@
 
   def getBuffer = buffer
 
-  def getErrorCode = errorCode
-
   override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
     val underlyingIterator = underlying.iterator
     override def hasNext(): Boolean = {
@@ -67,13 +63,11 @@
   override def equals(other: Any): Boolean = {
     other match {
       case that: ByteBufferMessageSet =>
-        (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
+        (that canEqual this) == buffer.equals(that.buffer) && initialOffset == that.initialOffset
       case _ => false
     }
   }
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
 
-  override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
-
 }
Index: contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
===================================================================
--- contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(revision 1374430)
+++ contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java	(working copy)
@@ -136,7 +136,6 @@
 
             while ( !gotNext && _respIterator.hasNext()) {
                 ByteBufferMessageSet msgSet = _respIterator.next();
-                if ( hasError(msgSet)) return false;
                 _messageIt = msgSet.iterator();
                 gotNext = get(key, value);
             }
@@ -249,36 +248,6 @@
         return range;
     }
     
-    /**
-     * Called by the default implementation of {@link #map} to check error code
-     * to determine whether to continue.
-     */
-    protected boolean hasError(ByteBufferMessageSet messages)
-            throws IOException {
-        short errorCode = messages.getErrorCode();
-        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
-            /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
-               Kafka server may delete old files from time to time */
-            System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
-
-            if (_retry >= MAX_RETRY_TIME)  return true;
-            _retry++;
-            // get the current offset range
-            _offsetRange = getOffsetRange();
-            _offset =  _offsetRange[0];
-            return false;
-        } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " : invalid offset.");
-        } else if (errorCode == ErrorMapping.InvalidPartitionCode()) {
-            throw new IOException(_input + " : wrong partition");
-        } else if (errorCode != ErrorMapping.NoError()) {
-            throw new IOException(_input + " current offset=" + _offset
-                    + " error:" + errorCode);
-        } else
-            return false;
-    }
-    
     public static int getClientBufferSize(Props props) throws Exception {
         return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
     }
