From 268e802311e010da4186c09c07837e6a59d49c3d Mon Sep 17 00:00:00 2001 From: Sriram Subramanian Date: Thu, 21 Feb 2013 23:40:29 -0800 Subject: [PATCH] Move size check after assigning offsets and re-compression. --- core/src/main/scala/kafka/log/Log.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 5cd36e0..85d1956 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -279,7 +279,14 @@ private[kafka] class Log(val dir: File, throw new IllegalArgumentException("Out of order offsets found in " + messages) (messageSetInfo.firstOffset, messageSetInfo.lastOffset) } - + + // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison + // happens with the new message size (after re-compression, if any) + for(messageAndOffset <- validMessages.shallowIterator) { + if(MessageSet.entrySize(messageAndOffset.message) > maxMessageSize) + throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(messageAndOffset.message), maxMessageSize)) + } + // now append to the log trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" .format(this.name, offsets._1, nextOffset.get(), validMessages)) @@ -308,8 +315,7 @@ private[kafka] class Log(val dir: File, /** * Validate the following: - * 1. each message is not too large - * 2. each message matches its CRC + * 1. each message matches its CRC * * Also compute the following quantities: * 1. First offset in the message set @@ -333,12 +339,10 @@ private[kafka] class Log(val dir: File, // update the last offset seen lastOffset = messageAndOffset.offset - // check the validity of the message by checking CRC and message size + // check the validity of the message by checking CRC val m = messageAndOffset.message m.ensureValid() - if(MessageSet.entrySize(m) > maxMessageSize) - throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize)) - + messageCount += 1; val messageCodec = m.compressionCodec -- 1.7.12.4 (Apple Git-37)