Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1378414)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -255,6 +255,7 @@
     }
   }
 
+
   /**
    * Read from the log file at the given offset
    */
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(revision 1378414)
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(working copy)
@@ -22,7 +22,7 @@
 import kafka.network._
 import kafka.message._
 import kafka.api._
-import kafka.common.ErrorMapping
+import kafka.common.{MessageSizeTooLargeException, ErrorMapping}
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils._
 
@@ -73,11 +73,15 @@
       BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(request.messages.sizeInBytes)
     }
     catch {
-      case e =>
-        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, e)
+      case e: MessageSizeTooLargeException =>
+        warn(e.getMessage())
         BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
         BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-        throw e
+      case t =>
+        error("Error processing " + requestHandlerName + " on " + request.topic + ":" + partition, t)
+        BrokerTopicStat.getBrokerTopicStat(request.topic).recordFailedProduceRequest
+        BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+        throw t
     }
   }
 
