Index: core/src/test/resources/log4j.properties
===================================================================
--- core/src/test/resources/log4j.properties	(revision 1302665)
+++ core/src/test/resources/log4j.properties	(working copy)
@@ -21,5 +21,5 @@
 log4j.logger.kafka=WARN
 
 # zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
-log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.I0Itec.zkclient.ZkClient=OFF
+log4j.logger.org.apache.zookeeper=OFF
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1302665)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -199,7 +199,7 @@
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
    * Returns the offset at which the messages are written.
    */
-  def append(messages: MessageSet): Unit = {
+  def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
@@ -211,12 +211,22 @@
     BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
     BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
     logStats.recordAppendedMessages(numberOfMessages)
-    
+
+    // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
+    val validByteBuffer = messages.getBuffer.duplicate()
+    val messageSetValidBytes = messages.validBytes
+    if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+      throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+        " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+    validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+    val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
     // they are valid, insert them in the log
     lock synchronized {
       try {
         val segment = segments.view.last
-        segment.messageSet.append(messages)
+        segment.messageSet.append(validMessages)
         maybeFlush(numberOfMessages)
         maybeRoll(segment)
       }
