Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1378359)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -23,18 +23,22 @@
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
-import kafka.common.{KafkaException, OffsetOutOfRangeException}
+import kafka.common.{MessageSizeTooLargeException, KafkaException, OffsetOutOfRangeException}
 import kafka.utils._
 import scala.Some
+import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
   val time = new MockTime
+  var config: KafkaConfig = null
 
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, -1)
+    config = new KafkaConfig(props)
   }
 
   @After
@@ -55,7 +59,7 @@
     val time: MockTime = new MockTime()
 
     // create a log
-    val log = new Log(logDir, 1000, 1000, rollMs, false, time)
+    val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, false, time)
     time.currentMs += rollMs + 1
 
     // segment age is less than its limit
@@ -89,7 +93,7 @@
     val logFileSize = msgPerSeg * (setSize - 1).asInstanceOf[Int] // each segment will be 10 messages
 
     // create a log
-    val log = new Log(logDir, logFileSize, 1000, 10000, false, time)
+    val log = new Log(logDir, logFileSize, config.maxMessageSize, 1000, 10000, false, time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
 
     // segments expire in size
@@ -102,14 +106,14 @@
   @Test
   def testLoadEmptyLog() {
     createEmptyLogs(logDir, 0)
-    new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
   }
 
   @Test
   def testLoadInvalidLogsFails() {
     createEmptyLogs(logDir, 0, 15)
     try {
-      new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+      new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       fail("Allowed load of corrupt logs without complaint.")
     } catch {
       case e: KafkaException => "This is good"
@@ -118,7 +122,7 @@
 
   @Test
   def testAppendAndRead() {
-    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
@@ -135,7 +139,7 @@
   @Test
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
-    val log = new Log(logDir, 1024, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 1024, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     assertEquals("Reading just beyond end of log should produce 0 byte read.", 0L, log.read(1024, 1000).sizeInBytes)
     try {
       log.read(0, 1024)
@@ -155,7 +159,7 @@
   @Test
   def testLogRolls() {
     /* create a multipart log with 100 messages */
-    val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+    val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
     val numMessages = 100
     for(i <- 0 until numMessages)
       log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -210,7 +214,7 @@
   def testEdgeLogRolls() {
     {
       // first test a log segment starting at 0
-      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       val curOffset = log.logEndOffset
       assertEquals(curOffset, 0)
 
@@ -223,7 +227,7 @@
 
     {
       // second test an empty log segment starting at none-zero
-      val log = new Log(logDir, 100, 1000, 24*7*60*60*1000L, false, time)
+      val log = new Log(logDir, 100, config.maxMessageSize, 1000, config.logRollHours*60*60*1000L, false, time)
       val numMessages = 1
       for(i <- 0 until numMessages)
         log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
@@ -246,6 +250,35 @@
     }
   }
 
+  @Test
+  def testMessageSizeCheck() {
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+    // append messages to log
+    val log = new Log(logDir, 100, 5, 1000, config.logRollHours*60*60*1000L, false, time)
+
+    var ret =
+      try {
+        log.append(first)
+        true
+      }
+      catch {
+        case e: MessageSizeTooLargeException => false
+      }
+    assert(ret, "First messageset should pass.")
+
+    ret =
+      try {
+        log.append(second)
+        false
+      }
+      catch {
+        case e:MessageSizeTooLargeException => true
+      }
+    assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+  }
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 
Index: core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(revision 1378359)
+++ core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala	(working copy)
@@ -93,7 +93,7 @@
   }
 
   @Test
-  def testSingleMessageSizeTooLarge() {
+  def testMessageSizeTooLarge() {
     val server = servers.head
     val props = new Properties()
     props.put("host", "localhost")
@@ -101,35 +101,25 @@
     props.put("buffer.size", "102400")
     props.put("connect.timeout.ms", "300")
     props.put("reconnect.interval", "500")
-    props.put("max.message.size", "100")
     val producer = new SyncProducer(new SyncProducerConfig(props))
-    val bytes = new Array[Byte](101)
-    try {
-      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes))))
-      Assert.fail("Message was too large to send, SyncProducer should have thrown exception.")
-    } catch {
-      case e: MessageSizeTooLargeException => /* success */
-    }
-  }
+    CreateTopicCommand.createTopic(zkClient, "test", 1, 1)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500)
 
-  @Test
-  def testCompressedMessageSizeTooLarge() {
-    val server = servers.head
-    val props = new Properties()
-    props.put("host", "localhost")
-    props.put("port", server.socketServer.port.toString)
-    props.put("buffer.size", "102400")
-    props.put("connect.timeout.ms", "300")
-    props.put("reconnect.interval", "500")
-    props.put("max.message.size", "100")
-    val producer = new SyncProducer(new SyncProducerConfig(props))
-    val bytes = new Array[Byte](101)
-    try {
-      producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec, messages = new Message(bytes))))
-      Assert.fail("Message was too large to send, SyncProducer should have thrown exception for DefaultCompressionCodec.")
-    } catch {
-      case e: MessageSizeTooLargeException => /* success */
-    }
+    val message1 = new Message(new Array[Byte](1000001))
+    val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
+    val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
+
+    Assert.assertEquals(1, response1.errors.length)
+    Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0))
+    Assert.assertEquals(-1L, response1.offsets(0))
+
+    val message2 = new Message(new Array[Byte](1000000))
+    val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
+    val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
+
+    Assert.assertEquals(1, response2.errors.length)
+    Assert.assertEquals(ErrorMapping.NoError, response2.errors(0))
+    Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0))
   }
 
   @Test
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1378359)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -19,6 +19,7 @@
 
 import kafka.message._
 import kafka.utils.{SystemTime, TestUtils, Utils}
+import kafka.server.KafkaConfig
 
 object TestLogPerformance {
 
@@ -29,8 +30,10 @@
     val messageSize = args(1).toInt
     val batchSize = args(2).toInt
     val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val config = new KafkaConfig(props)
     val dir = TestUtils.tempDir()
-    val log = new Log(dir, 50*1024*1024, 5000000, 24*7*60*60*1000L, false, SystemTime)
+    val log = new Log(dir, 50*1024*1024, config.maxMessageSize, 5000000, config.logRollHours*60*60*1000L, false, SystemTime)
     val bytes = new Array[Byte](messageSize)
     new java.util.Random().nextBytes(bytes)
     val message = new Message(bytes)
Index: core/src/main/scala/kafka/producer/SyncProducer.scala
===================================================================
--- core/src/main/scala/kafka/producer/SyncProducer.scala	(revision 1378359)
+++ core/src/main/scala/kafka/producer/SyncProducer.scala	(working copy)
@@ -103,7 +103,6 @@
   def send(producerRequest: ProducerRequest): ProducerResponse = {
     for( topicData <- producerRequest.data ) {
       for( partitionData <- topicData.partitionDataArray ) {
-	      verifyMessageSize(partitionData.messages)
         val setSize = partitionData.messages.sizeInBytes.asInstanceOf[Int]
         trace("Got message set with " + setSize + " bytes to send")
       }
@@ -127,12 +126,6 @@
     }
   }
 
-  private def verifyMessageSize(messages: MessageSet) {
-    for (messageAndOffset <- messages)
-      if (messageAndOffset.message.payloadSize > config.maxMessageSize)
-        throw new MessageSizeTooLargeException
-  }
-
   private def reconnect() {
     disconnect()
     connect()
Index: core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
===================================================================
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(revision 1378359)
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala	(working copy)
@@ -192,8 +192,10 @@
         val errors = new ListBuffer[(String, Int)]
         for( topic <- topicData; partition <- topic.partitionDataArray ) {
           msgIdx += 1
-          if(msgIdx > response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
+          if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError)
             errors.append((topic.topic, partition.partition))
+          if (msgIdx < response.errors.size && response.errors(msgIdx) == ErrorMapping.MessageSizeTooLargeCode)
+              warn("payload size larger than the maxMessageSize at broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
         }
         errors
       } catch {
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1378359)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -51,6 +51,9 @@
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = props.getIntInRange("max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
+
+  /* the maximum size of message that the server can receive */
+  val maxMessageSize = props.getIntInRange("max.message.size", 1000000, (0, Int.MaxValue))
   
   /* the number of network threads that the server uses for handling network requests */
   val numNetworkThreads = props.getIntInRange("network.threads", 3, (1, Int.MaxValue))
Index: core/src/main/scala/kafka/server/KafkaApis.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaApis.scala	(revision 1378359)
+++ core/src/main/scala/kafka/server/KafkaApis.scala	(working copy)
@@ -194,12 +194,13 @@
           case e =>
             BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
-            error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
             e match {
               case _: IOException =>
+                error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
                 fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
                 System.exit(1)
               case _ =>
+                error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
                 errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
                 offsets(msgIndex) = -1
             }
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1378359)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -128,9 +128,9 @@
  * An append-only log for storing messages. 
  */
 @threadsafe
-private[kafka] class Log( val dir: File, val maxSize: Long,
-                          val flushInterval: Int, val rollIntervalMs: Long, val needRecovery: Boolean,
-                          time: Time, brokerId: Int = 0) extends Logging {
+private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
+                          val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
+                          brokerId: Int = 0) extends Logging {
   this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
@@ -235,6 +235,7 @@
    */
   def append(messages: ByteBufferMessageSet): Unit = {
     // validate the messages
+    messages.verifyMessageSize(maxMessageSize)
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
       if(!messageAndOffset.message.isValid)
@@ -324,7 +325,7 @@
    * Roll the log over if necessary
    */
   private def maybeRoll(segment: LogSegment) {
-    if ((segment.messageSet.sizeInBytes > maxSize) ||
+    if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
        ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
       roll()
   }
Index: core/src/main/scala/kafka/log/LogManager.scala
===================================================================
--- core/src/main/scala/kafka/log/LogManager.scala	(revision 1378359)
+++ core/src/main/scala/kafka/log/LogManager.scala	(working copy)
@@ -66,7 +66,7 @@
         val topic = Utils.getTopicPartition(dir.getName)._1
         val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
         val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-        val log = new Log(dir, maxLogFileSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
+        val log = new Log(dir, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
         val topicPartition = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartition._1)
@@ -108,7 +108,7 @@
       d.mkdirs()
       val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
       val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-      new Log(d, maxLogFileSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
+      new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
     }
   }
 
Index: core/src/main/scala/kafka/common/ErrorMapping.scala
===================================================================
--- core/src/main/scala/kafka/common/ErrorMapping.scala	(revision 1378359)
+++ core/src/main/scala/kafka/common/ErrorMapping.scala	(working copy)
@@ -40,6 +40,7 @@
   val RequestTimedOutCode: Short = 8
   val BrokerNotExistInZookeeperCode: Short = 9
   val ReplicaNotAvailableCode: Short = 10
+  val MessageSizeTooLargeCode: Short = 11
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -52,7 +53,8 @@
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
       classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
-      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
+      classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
+      classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */
