Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1159148) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -109,7 +116,20 @@ var innerIter:Iterator[MessageAndOffset] = null var lastMessageSize = 0L - def innerDone():Boolean = (innerIter==null || !innerIter.hasNext) + def innerDone():Boolean = { + innerIter match { + case null => true // since the iterator is null, return true which means there are no messages in this level + case _ => + innerIter.hasNext match { + case true => false // since the iterator has more messages, return true + case false => // since this iterator has no more messages + currValidBytes += 4 + lastMessageSize + if(logger.isTraceEnabled) + logger.trace("Last message in this level. Updating currValidBytes to " + currValidBytes) + true + } + } + } def makeNextOuter: MessageAndOffset = { if (topIter.remaining < 4) { @@ -126,9 +146,9 @@ if(size < 0 || topIter.remaining < size) { deepValidByteCount = currValidBytes if (currValidBytes == 0 || size < 0) - throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " + - " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption " - .format(size, topIter.remaining, currValidBytes)) + throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " + + topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " + + "the fetch size; (2) log corruption )") return allDone() } val message = topIter.slice() @@ -141,6 +161,8 @@ logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes)) innerIter = null currValidBytes += 4 + size + if(logger.isTraceEnabled) + logger.trace("currValidBytes = " + currValidBytes) new MessageAndOffset(newMessage, currValidBytes) case _ => if(logger.isDebugEnabled) @@ -151,14 +173,13 @@ } override def makeNext(): MessageAndOffset = { + val isInnerDone = innerDone() if(logger.isDebugEnabled) - logger.debug("makeNext() in deepIterator: innerDone = " + innerDone) - innerDone match { + logger.debug("makeNext() in deepIterator: innerDone = " + isInnerDone) + isInnerDone match { case true => makeNextOuter case false => { val messageAndOffset = innerIter.next - if(!innerIter.hasNext) - currValidBytes += 4 + lastMessageSize new MessageAndOffset(messageAndOffset.message, currValidBytes) } }