diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2309333..c49308e 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -227,67 +227,113 @@ private[kafka] class Log(val dir: File, /** * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary. + * + * This method will generally be responsible for assigning offsets to the messages, + * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. + * * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, * or (-1,-1) if the message set is empty */ - def append(messages: ByteBufferMessageSet): (Long, Long) = { - // check that all messages are valid and see if we have any compressed messages - var messageCount = 0 - var codec: CompressionCodec = NoCompressionCodec - for(messageAndOffset <- messages.shallowIterator) { - val m = messageAndOffset.message - m.ensureValid() - if(MessageSet.entrySize(m) > maxMessageSize) - throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize)) - messageCount += 1; - val messageCodec = m.compressionCodec - if(messageCodec != NoCompressionCodec) - codec = messageCodec - } - + def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): (Long, Long) = { + val messageSetInfo = analyzeAndValidateMessageSet(messages) + // if we have any valid messages, append them to the log - if(messageCount == 0) { + if(messageSetInfo.count == 0) { (-1L, -1L) } else { - BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageCount) - BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageCount) + BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageSetInfo.count) + BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageSetInfo.count) // trim any invalid bytes or partial messages before appending it to the on-disk log var validMessages = trimInvalidBytes(messages) // they are valid, insert them in the log - lock synchronized { + val offsets = lock synchronized { try { - val firstOffset = nextOffset.get - - // maybe roll the log + // maybe roll the log if this segment is full val segment = maybeRoll(segments.view.last) - // assign offsets to the messages - validMessages = validMessages.assignOffsets(nextOffset, codec) + // assign offsets to the messageset + val offsets = + if(assignOffsets) { + val firstOffset = nextOffset.get + validMessages = validMessages.assignOffsets(nextOffset, messageSetInfo.codec) + val lastOffset = nextOffset.get - 1 + (firstOffset, lastOffset) + } else { + if(!messageSetInfo.offsetsMonotonic) + throw new IllegalArgumentException("Out of order offsets found in " + messages) + nextOffset.set(messageSetInfo.lastOffset + 1) + (messageSetInfo.firstOffset, messageSetInfo.lastOffset) + } - trace("Appending message set to " + this.name + ": " + validMessages) - // now append to the log - segment.append(firstOffset, validMessages) - val lastOffset = nextOffset.get - 1 - - // maybe flush the log and index - maybeFlush(messageCount) + trace("Appending message set to " + this.name + ": " + validMessages) + segment.append(offsets._1, validMessages) // return the offset at which the messages were appended - (firstOffset, lastOffset) + offsets } catch { case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } } + // maybe flush the log and index + maybeFlush(messageSetInfo.count) + + // return the first and last offset + offsets + } + } + + /* struct to hold various quantities we compute about each message set before appending to the log */ + case class MessageSetAppendInfo(firstOffset: Long, lastOffset: Long, codec: CompressionCodec, count: Int, offsetsMonotonic: Boolean) + + /** + * Validate the following: + * 1. each message is not too large + * 2. each message matches its CRC + * + * Also compute the following quanitities: + * 1. First offset in the message set + * 2. Last offset in the message set + * 3. Number of messages + * 4. Whether the offsets are monotonicly increasing + * 5. Whether any compression codec is used (if many are used, then the last one is given) + */ + private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): MessageSetAppendInfo = { + var messageCount = 0 + var firstOffset, lastOffset = -1L + var codec: CompressionCodec = NoCompressionCodec + var monotonic = true + for(messageAndOffset <- messages.shallowIterator) { + // update the first offset if on the first message + if(firstOffset < 0) + firstOffset = messageAndOffset.offset + // check that offsets are monotonically increasing + if(lastOffset >= messageAndOffset.offset) + monotonic = false + // update the last offset seen + lastOffset = messageAndOffset.offset + + // check the validity of the message by checking CRC and message size + val m = messageAndOffset.message + m.ensureValid() + if(MessageSet.entrySize(m) > maxMessageSize) + throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize)) + + messageCount += 1; + + val messageCodec = m.compressionCodec + if(messageCodec != NoCompressionCodec) + codec = messageCodec } + MessageSetAppendInfo(firstOffset, lastOffset, codec, messageCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) */ - def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = { + private def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = { val messageSetValidBytes = messages.validBytes if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0) throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2296cb3..f69fb18 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -184,7 +184,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { val localReplica = replicaManager.getLeaderReplicaIfLocal(key.topic, key.partition) val log = localReplica.log.get - val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + val (start, end) = log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet], assignOffsets = true) // we may need to increment high watermark since ISR could be down to 1 localReplica.partition.maybeIncrementLeaderHW(localReplica) trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d" diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index c790e79..324f5f8 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -39,7 +39,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) - replica.log.get.append(messageSet) + replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages" .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 2f49139..1e7217c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io._ import java.util.ArrayList +import java.util.concurrent.atomic._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} @@ -341,6 +342,31 @@ class LogTest extends JUnitSuite { assertEquals("Should change offset", 0, log.logEndOffset) assertEquals("Should change log size", log.size, 0) } + + @Test + def testAppendWithoutOffsetAssignment() { + for(codec <- List(NoCompressionCodec, DefaultCompressionCodec)) { + logDir.mkdir() + var log = new Log(logDir, + maxLogFileSize = 64*1024, + maxMessageSize = config.maxMessageSize, + maxIndexSize = 1000, + indexIntervalBytes = 10000, + needsRecovery = true) + val messages = List("one", "two", "three", "four", "five", "six") + val ms = new ByteBufferMessageSet(compressionCodec = codec, + offsetCounter = new AtomicLong(5), + messages = messages.map(s => new Message(s.getBytes)):_*) + val firstOffset = ms.shallowIterator.toList.head.offset + val lastOffset = ms.shallowIterator.toList.last.offset + val (first, last) = log.append(ms, assignOffsets = false) + assertEquals(last + 1, log.logEndOffset) + assertEquals(firstOffset, first) + assertEquals(lastOffset, last) + assertTrue(log.read(5, 64*1024).size > 0) + log.delete() + } + } @Test def testReopenThenTruncate() {