diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index f6ee475..c1a64c1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -270,33 +270,32 @@ private[kafka] class Log(val dir: File, try { // they are valid, insert them in the log - val offsetsAndNumAppendedMessages = lock synchronized { + val offsets = lock synchronized { val firstOffset = nextOffset.get // maybe roll the log if this segment is full val segment = maybeRoll(segments.view.last) // assign offsets to the messageset - val offsets = + val lastOffset = if(assignOffsets) { val offsetCounter = new AtomicLong(nextOffset.get) - val firstOffset = offsetCounter.get try { validMessages = validMessages.assignOffsets(offsetCounter, messageSetInfo.codec) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } - val lastOffset = offsetCounter.get - 1 - val numMessages = lastOffset - firstOffset + 1 + val assignedLastOffset = offsetCounter.get - 1 + val numMessages = assignedLastOffset - firstOffset + 1 BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages) BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages) - (firstOffset, lastOffset) + assignedLastOffset } else { require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages) require(messageSetInfo.firstOffset >= nextOffset.get, "Attempt to append a message set beginning with offset %d to a log with log end offset %d." .format(messageSetInfo.firstOffset, nextOffset.get)) - (messageSetInfo.firstOffset, messageSetInfo.lastOffset) + messageSetInfo.lastOffset } // Check if the message sizes are valid. This check is done after assigning offsets to ensure the comparison @@ -308,23 +307,24 @@ private[kafka] class Log(val dir: File, } // now append to the log - trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" - .format(this.name, offsets._1, nextOffset.get(), validMessages)) - segment.append(offsets._1, validMessages) + segment.append(firstOffset, validMessages) // advance the log end offset - nextOffset.set(offsets._2 + 1) - val numAppendedMessages = (nextOffset.get - firstOffset).toInt + nextOffset.set(lastOffset + 1) + + trace("Appended message set to %s offset: %d nextOffset: %d messageSet: %s" + .format(this.name, firstOffset, nextOffset.get(), validMessages)) // return the offset at which the messages were appended - (offsets._1, offsets._2, numAppendedMessages) + (firstOffset, lastOffset) } // maybe flush the log and index - maybeFlush(offsetsAndNumAppendedMessages._3) + val numAppendedMessages = (offsets._2 - offsets._1 + 1).toInt + maybeFlush(numAppendedMessages) // return the first and last offset - (offsetsAndNumAppendedMessages._1, offsetsAndNumAppendedMessages._2) + (offsets._1, offsets._2) } catch { case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e) } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 786ae03..4ed88e8 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -379,15 +379,15 @@ class LogTest extends JUnitSuite { needsRecovery = true) val messages = List("one", "two", "three", "four", "five", "six") val ms = new ByteBufferMessageSet(compressionCodec = codec, - offsetCounter = new AtomicLong(5), + offsetCounter = new AtomicLong(0), messages = messages.map(s => new Message(s.getBytes)):_*) - val firstOffset = ms.shallowIterator.toList.head.offset - val lastOffset = ms.shallowIterator.toList.last.offset + val firstOffset = ms.toList.head.offset + val lastOffset = ms.toList.last.offset val (first, last) = log.append(ms, assignOffsets = false) assertEquals(last + 1, log.logEndOffset) assertEquals(firstOffset, first) assertEquals(lastOffset, last) - assertTrue(log.read(5, 64*1024).size > 0) + assertTrue(log.read(0, 64*1024).size > 0) log.delete() } }