Index: core/src/test/resources/log4j.properties =================================================================== --- core/src/test/resources/log4j.properties (revision 1302665) +++ core/src/test/resources/log4j.properties (working copy) @@ -21,5 +21,5 @@ log4j.logger.kafka=WARN # zkclient can be verbose, during debugging it is common to adjust is separately -log4j.logger.org.I0Itec.zkclient.ZkClient=WARN -log4j.logger.org.apache.zookeeper=WARN +log4j.logger.org.I0Itec.zkclient.ZkClient=OFF +log4j.logger.org.apache.zookeeper=OFF Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1302665) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -199,7 +199,7 @@ * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. * Returns the offset at which the messages are written. */ - def append(messages: MessageSet): Unit = { + def append(messages: ByteBufferMessageSet): Unit = { // validate the messages var numberOfMessages = 0 for(messageAndOffset <- messages) { @@ -211,12 +211,22 @@ BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages) BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) logStats.recordAppendedMessages(numberOfMessages) - + + // truncate the message set's buffer upto validbytes, before appending it to the on-disk log + val validByteBuffer = messages.getBuffer.duplicate() + val messageSetValidBytes = messages.validBytes + if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0) + throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + + " Message set cannot be appended to log. Possible causes are corrupted produce requests") + + validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int]) + val validMessages = new ByteBufferMessageSet(validByteBuffer) + // they are valid, insert them in the log lock synchronized { try { val segment = segments.view.last - segment.messageSet.append(messages) + segment.messageSet.append(validMessages) maybeFlush(numberOfMessages) maybeRoll(segment) } Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1302665) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -61,7 +61,7 @@ private def shallowValidBytes: Long = { if(shallowValidByteCount < 0) { - val iter = this.internalIterator() + val iter = this.internalIterator(true) while(iter.hasNext) { val messageAndOffset = iter.next shallowValidByteCount = messageAndOffset.offset @@ -88,7 +88,6 @@ } } - /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { ErrorMapping.maybeThrowException(errorCode)