Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1245210)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -46,10 +46,15 @@
   @volatile
   private var shutdown: Boolean = false
 
-  debug("Instantiating Scala Sync Producer")
+  trace("Instantiating Scala Sync Producer")
 
   private def verifySendBuffer(buffer : ByteBuffer) = {
-    if (logger.isTraceEnabled) {
+    /**
+     * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
+     * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
+     * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
+     */
+    if (logger.isDebugEnabled) {
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
       if (requestTypeId == RequestKeys.MultiProduce) {
@@ -59,17 +64,17 @@
             try {
               for (messageAndOffset <- produce.messages)
                 if (!messageAndOffset.message.isValid)
-                  trace("topic " + produce.topic + " is invalid")
+                  throw new InvalidMessageException("Message for topic " + produce.topic + " is invalid")
             }
             catch {
               case e: Throwable =>
-                trace("error iterating messages ", e)
+                error("error iterating messages ", e)
             }
           }
         }
         catch {
           case e: Throwable =>
-            trace("error verifying sendbuffer ", e)
+            error("error verifying sendbuffer ", e)
         }
       }
     }
Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
===================================================================
--- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(revision 1245210)
+++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala	(working copy)
@@ -38,6 +38,8 @@
                            private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
   private var validByteCount = -1L
   private var shallowValidByteCount = -1L
+  if(sizeInBytes > Int.MaxValue)
+    throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
 
   def this(compressionCodec: CompressionCodec, messages: Message*) {
     this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -108,12 +110,16 @@
         val newMessage = new Message(message)
         newMessage.compressionCodec match {
           case NoCompressionCodec =>
+            if(!newMessage.isValid)
+              throw new InvalidMessageException("Uncompressed essage is invalid")
             debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
             innerIter = null
             currValidBytes += 4 + size
             trace("currValidBytes = " + currValidBytes)
             new MessageAndOffset(newMessage, currValidBytes)
           case _ =>
+            if(!newMessage.isValid)
+              throw new InvalidMessageException("Compressed message is invalid")
             debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
             innerIter = CompressionUtils.decompress(newMessage).deepIterator
             if (!innerIter.hasNext) {
Index: core/src/main/scala/kafka/message/InvalidMessageException.scala
===================================================================
--- core/src/main/scala/kafka/message/InvalidMessageException.scala	(revision 1245210)
+++ core/src/main/scala/kafka/message/InvalidMessageException.scala	(working copy)
@@ -20,4 +20,6 @@
 /**
  * Indicates that a message failed its checksum and is corrupt
  */
-class InvalidMessageException extends RuntimeException
+class InvalidMessageException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
