Index: core/src/main/scala/kafka/tools/ConsumerShell.scala =================================================================== --- core/src/main/scala/kafka/tools/ConsumerShell.scala (revision 1181967) +++ core/src/main/scala/kafka/tools/ConsumerShell.scala (working copy) @@ -98,7 +98,7 @@ } }catch { case e:ConsumerTimeoutException => // this is ok - case oe: Exception => logger.error(oe) + case oe: Exception => logger.error("error in ZKConsumerThread", oe) } shutdownLatch.countDown println("Received " + count + " messages") Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1181967) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -99,7 +99,7 @@ logger.trace("size of data = " + size) } if(size < 0 || topIter.remaining < size) { - if (currValidBytes == 0 || size < 0) + if (currValidBytes == initialOffset || size < 0) throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " + topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " + "the fetch size; (2) log corruption )") Index: core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala =================================================================== --- core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (revision 1181967) +++ core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (working copy) @@ -76,7 +76,8 @@ * add an empty message with the exception to the queue so that client can see the error */ def enqueueError(e: Throwable, fetchOffset: Long) = { - val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0, + errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) }