Index: core/src/test/scala/unit/kafka/log/LogTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogTest.scala	(revision 1376329)
+++ core/src/test/scala/unit/kafka/log/LogTest.scala	(working copy)
@@ -23,16 +23,19 @@
 import org.scalatest.junit.JUnitSuite
 import org.junit.{After, Before, Test}
 import kafka.utils.{Utils, TestUtils, Range}
-import kafka.common.OffsetOutOfRangeException
+import kafka.common.{MessageSizeTooLargeException, OffsetOutOfRangeException}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.server.KafkaConfig
 
 class LogTest extends JUnitSuite {
   
   var logDir: File = null
-
+  var config:KafkaConfig = null
   @Before
   def setUp() {
     logDir = TestUtils.tempDir()
+    val props = TestUtils.createBrokerConfig(0, -1)
+    config = new KafkaConfig(props)
   }
 
   @After
@@ -67,7 +70,7 @@
     val log = new Log(logDir, 1024, 1000, false)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 10)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message), config.maxMessageSize)
     log.flush()
     val messages = log.read(0, 1024)
     var current = 0
@@ -104,7 +107,7 @@
     val log = new Log(logDir, 100, 1000, false)
     val numMessages = 100
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+      log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()), config.maxMessageSize)
     log.flush
     
     /* now do successive reads and iterate over the resulting message sets counting the messages
@@ -172,7 +175,7 @@
       val log = new Log(logDir, 100, 1000, false)
       val numMessages = 1
       for(i <- 0 until numMessages)
-        log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
+        log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()), config.maxMessageSize)
 
       val curOffset = log.nextAppendOffset
       // time goes by; the log file is deleted
@@ -192,6 +195,35 @@
     }
   }
 
+  @Test
+  def testMessageSizeCheck() {
+    val first = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes()), new Message("bethe".getBytes()))
+    val second = new ByteBufferMessageSet(NoCompressionCodec, new Message("change".getBytes()))
+
+    // append messages to log
+    val log = new Log(logDir, 100, 1000, false)
+
+    var ret =
+    try {
+      log.append(first, 5)
+      true
+    }
+    catch {
+      case e: MessageSizeTooLargeException => false
+    }
+    assert(ret, "First messageset should pass.")
+
+    ret =
+    try {
+      log.append(second, 5)
+      false
+    }
+    catch {
+      case e:MessageSizeTooLargeException => true
+    }
+    assert(ret, "Second message set should throw MessageSizeTooLargeException.")
+  }
+
   def assertContains(ranges: Array[Range], offset: Long) = {
     Log.findRange(ranges, offset) match {
       case Some(range) => 
Index: core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(revision 1376329)
+++ core/src/test/scala/unit/kafka/log/LogOffsetTest.scala	(working copy)
@@ -99,7 +99,7 @@
 
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message), server.config.maxMessageSize)
     log.flush()
 
     Thread.sleep(100)
@@ -152,7 +152,7 @@
     val log = logManager.getOrCreateLog(topic, part)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message), server.config.maxMessageSize)
     log.flush()
 
     val now = System.currentTimeMillis
@@ -177,7 +177,7 @@
     val log = logManager.getOrCreateLog(topic, part)
     val message = new Message(Integer.toString(42).getBytes())
     for(i <- 0 until 20)
-      log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
+      log.append(new ByteBufferMessageSet(NoCompressionCodec, message), server.config.maxMessageSize)
     log.flush()
 
     Thread.sleep(100)
Index: core/src/test/scala/unit/kafka/log/LogManagerTest.scala
===================================================================
--- core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(revision 1376329)
+++ core/src/test/scala/unit/kafka/log/LogManagerTest.scala	(working copy)
@@ -58,7 +58,7 @@
     val log = logManager.getOrCreateLog(name, 0)
     val logFile = new File(config.logDir, name + "-0")
     assertTrue(logFile.exists)
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singleMessageSet("test".getBytes()), config.maxMessageSize)
   }
 
   @Test
@@ -75,7 +75,7 @@
     var offset = 0L
     for(i <- 0 until 1000) {
       var set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
+      log.append(set, config.maxMessageSize)
       offset += set.sizeInBytes
     }
     log.flush
@@ -97,7 +97,7 @@
       case e: OffsetOutOfRangeException => "This is good."
     }
     // log should still be appendable
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singleMessageSet("test".getBytes()), config.maxMessageSize)
   }
 
   @Test
@@ -125,7 +125,7 @@
     // add a bunch of messages that should be larger than the retentionSize
     for(i <- 0 until 1000) {
       val set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
+      log.append(set, config.maxMessageSize)
       offset += set.sizeInBytes
     }
     // flush to make sure it's written to disk, then sleep to confirm
@@ -146,7 +146,7 @@
       case e: OffsetOutOfRangeException => "This is good."
     }
     // log should still be appendable
-    log.append(TestUtils.singleMessageSet("test".getBytes()))
+    log.append(TestUtils.singleMessageSet("test".getBytes()), config.maxMessageSize)
   }
 
   @Test
@@ -166,7 +166,7 @@
     val log = logManager.getOrCreateLog("timebasedflush", 0)
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
-      log.append(set)
+      log.append(set, config.maxMessageSize)
     }
 
     assertTrue("The last flush time has to be within defaultflushInterval of current time ",
@@ -192,7 +192,7 @@
       val log = logManager.getOrCreateLog("testPartition", i)
       for(i <- 0 until 250) {
         var set = TestUtils.singleMessageSet("test".getBytes())
-        log.append(set)
+        log.append(set, config.maxMessageSize)
       }
     }
 
Index: core/src/test/scala/other/kafka/TestLogPerformance.scala
===================================================================
--- core/src/test/scala/other/kafka/TestLogPerformance.scala	(revision 1376329)
+++ core/src/test/scala/other/kafka/TestLogPerformance.scala	(working copy)
@@ -19,10 +19,13 @@
 
 import kafka.message._
 import kafka.utils.{TestUtils, Utils}
+import kafka.server.KafkaConfig
 
 object TestLogPerformance {
 
   def main(args: Array[String]): Unit = {
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val config = new KafkaConfig(props)
     if(args.length < 4)
       Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
     val numMessages = args(0).toInt
@@ -41,7 +44,7 @@
     val numBatches = numMessages / batchSize
     val start = System.currentTimeMillis()
     for(i <- 0 until numBatches)
-      log.append(messageSet)
+      log.append(messageSet, config.maxMessageSize)
     log.close()
     val ellapsed = (System.currentTimeMillis() - start) / 1000.0
     val writtenBytes = MessageSet.entrySize(message) * numMessages
Index: core/src/main/scala/kafka/log/Log.scala
===================================================================
--- core/src/main/scala/kafka/log/Log.scala	(revision 1376329)
+++ core/src/main/scala/kafka/log/Log.scala	(working copy)
@@ -199,8 +199,9 @@
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
    * Returns the offset at which the messages are written.
    */
-  def append(messages: ByteBufferMessageSet): Unit = {
+  def append(messages: ByteBufferMessageSet, maxMessageSize: Int): Unit = {
     // validate the messages
+    messages.verifyMessageSize(maxMessageSize)
     var numberOfMessages = 0
     for(messageAndOffset <- messages) {
       if(!messageAndOffset.message.isValid)
Index: core/src/main/scala/kafka/server/KafkaConfig.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaConfig.scala	(revision 1376329)
+++ core/src/main/scala/kafka/server/KafkaConfig.scala	(working copy)
@@ -42,7 +42,10 @@
   
   /* the maximum number of bytes in a socket request */
   val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))
-  
+
+  /* the maximum size of message that the server can receive */
+  val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))
+
   /* the number of worker threads that the server uses for handling all client requests*/
   val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))
   
Index: core/src/main/scala/kafka/server/KafkaServer.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaServer.scala	(revision 1376329)
+++ core/src/main/scala/kafka/server/KafkaServer.scala	(working copy)
@@ -62,7 +62,7 @@
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
                                                     
-    val handlers = new KafkaRequestHandlers(logManager)
+    val handlers = new KafkaRequestHandlers(logManager, config)
     socketServer = new SocketServer(config.port,
                                     config.numThreads,
                                     config.monitoringPeriodSecs,
Index: core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
===================================================================
--- core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(revision 1376329)
+++ core/src/main/scala/kafka/server/KafkaRequestHandlers.scala	(working copy)
@@ -29,7 +29,7 @@
 /**
  * Logic to handle the various Kafka requests
  */
-private[kafka] class KafkaRequestHandlers(val logManager: LogManager) extends Logging {
+private[kafka] class KafkaRequestHandlers(val logManager: LogManager, val config: KafkaConfig) extends Logging {
   
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
@@ -66,7 +66,7 @@
   private def handleProducerRequest(request: ProducerRequest, requestHandlerName: String) = {
     val partition = request.getTranslatedPartition(logManager.chooseRandomPartition)
     try {
-      logManager.getOrCreateLog(request.topic, partition).append(request.messages)
+      logManager.getOrCreateLog(request.topic, partition).append(request.messages, config.maxMessageSize)
       trace(request.messages.sizeInBytes + " bytes written to logs.")
       request.messages.foreach(m => trace("wrote message %s to disk".format(m.message.checksum)))
       BrokerTopicStat.getBrokerTopicStat(request.topic).recordBytesIn(request.messages.sizeInBytes)
