Index: core/src/main/scala/kafka/log/FileMessageSet.scala =================================================================== --- core/src/main/scala/kafka/log/FileMessageSet.scala (revision 1398915) +++ core/src/main/scala/kafka/log/FileMessageSet.scala (working copy) @@ -37,14 +37,18 @@ class FileMessageSet private[kafka](val file: File, private[log] val channel: FileChannel, private[log] val start: Long = 0L, - private[log] val limit: Long = Long.MaxValue) extends MessageSet with Logging { + private[log] val limit: Long = Long.MaxValue, + initChannelPositionToEnd: Boolean = true) extends MessageSet with Logging { /* the size of the message set in bytes */ private val _size = new AtomicLong(scala.math.min(channel.size(), limit) - start) - - /* set the file position to the last byte in the file */ - channel.position(channel.size) - + + if (initChannelPositionToEnd) { + /* set the file position to the last byte in the file */ + channel.position(channel.size) + info("After changed to position %d with size %d".format(channel.position(), channel.size())) + } + /** * Create a file message set with no limit or offset */ @@ -59,7 +63,11 @@ * Return a message set which is a view into this set starting from the given position and with the given size limit. */ def read(position: Long, size: Long): FileMessageSet = { - new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes())) + new FileMessageSet(file, + channel, + this.start + position, + scala.math.min(this.start + position + size, sizeInBytes()), + false) } /** @@ -74,7 +82,8 @@ buffer.rewind() channel.read(buffer, position) if(buffer.hasRemaining) - throw new IllegalStateException("Failed to read complete buffer.") + throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s" + .format(targetOffset, startingPosition, file.getAbsolutePath)) buffer.rewind() val offset = buffer.getLong() if(offset >= targetOffset) @@ -92,7 +101,7 @@ channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel) /** - * Get an iterator over the messages in the set + * Get an iterator over the messages in the set. We only do shallow iteration here. */ override def iterator: Iterator[MessageAndOffset] = { new IteratorTemplate[MessageAndOffset] { @@ -133,10 +142,8 @@ /** * Append this message to the message set */ - def append(messages: MessageSet): Unit = { - var written = 0L - while(written < messages.sizeInBytes) - written += messages.writeTo(channel, 0, messages.sizeInBytes) + def append(messages: ByteBufferMessageSet) { + val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } Index: core/src/main/scala/kafka/log/Log.scala =================================================================== --- core/src/main/scala/kafka/log/Log.scala (revision 1398915) +++ core/src/main/scala/kafka/log/Log.scala (working copy) @@ -268,7 +268,8 @@ } // now append to the log - trace("Appending message set to " + this.name + ": " + validMessages) + trace("Appending message set to %s offset: %d nextOffset: %d messageSet: %s" + .format(this.name, offsets._1, nextOffset.get(), validMessages)) segment.append(offsets._1, validMessages) // return the offset at which the messages were appended @@ -315,7 +316,7 @@ monotonic = false // update the last offset seen lastOffset = messageAndOffset.offset - + // check the validity of the message by checking CRC and message size val m = messageAndOffset.message m.ensureValid() Index: core/src/main/scala/kafka/log/LogSegment.scala =================================================================== --- core/src/main/scala/kafka/log/LogSegment.scala (revision 1398915) +++ core/src/main/scala/kafka/log/LogSegment.scala (working copy) @@ -2,9 +2,8 @@ import scala.math._ import java.io.File -import kafka.common._ import kafka.message._ -import kafka.utils.{Utils, Range, Time, SystemTime, nonthreadsafe} +import kafka.utils._ /** * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing @@ -19,7 +18,7 @@ val index: OffsetIndex, val start: Long, val indexIntervalBytes: Int, - time: Time) extends Range { + time: Time) extends Range with Logging { var firstAppendTime: Option[Long] = None @@ -51,6 +50,7 @@ */ def append(offset: Long, messages: ByteBufferMessageSet) { if (messages.sizeInBytes > 0) { + trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, messageSet.sizeInBytes())) // append an entry to the index (if needed) if(bytesSinceLastIndexEntry > indexIntervalBytes) { index.append(offset, messageSet.sizeInBytes().toInt) Index: core/src/main/scala/kafka/message/ByteBufferMessageSet.scala =================================================================== --- core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (revision 1398915) +++ core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (working copy) @@ -125,8 +125,11 @@ /** Write the messages in this set to the given channel */ def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = { + // Ignore offset and size from input. We just want to write the whole buffer to the channel. buffer.mark() - val written = channel.write(buffer) + var written = 0L + while(written < sizeInBytes) + written += channel.write(buffer) buffer.reset() written } Index: core/src/main/scala/kafka/tools/DumpLogSegments.scala =================================================================== --- core/src/main/scala/kafka/tools/DumpLogSegments.scala (revision 1398915) +++ core/src/main/scala/kafka/tools/DumpLogSegments.scala (working copy) @@ -47,9 +47,9 @@ for(i <- 0 until index.entries) { val entry = index.entry(i) // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one - if(entry.offset <= startOffset) + if(entry.offset == 0 && i > 0) return - println("offset: %d position: %d".format(entry.offset, entry.position)) + println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position)) } } @@ -61,10 +61,10 @@ var validBytes = 0L for(messageAndOffset <- messageSet) { val msg = messageAndOffset.message + print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid + + " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + + " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) validBytes += MessageSet.entrySize(msg) - print("offset: " + messageAndOffset.offset + " isvalid: " + msg.isValid + - " payloadsize: " + msg.payloadSize + " magic: " + msg.magic + - " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum) if(msg.hasKey) print(" keysize: " + msg.keySize) if(printContents) { Index: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala =================================================================== --- core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (revision 1398915) +++ core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (working copy) @@ -46,15 +46,15 @@ if (fetchOffset != replica.logEndOffset) throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset)) - trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId, - replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) + trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d" + .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, partitionData.hw)) replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages" - .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) + .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes)) val followerHighWatermark = replica.logEndOffset.min(partitionData.hw) replica.highWatermark = followerHighWatermark trace("Follower %d set replica highwatermark for topic %s partition %d to %d" - .format(replica.brokerId, topic, partitionId, followerHighWatermark)) + .format(replica.brokerId, topic, partitionId, followerHighWatermark)) } // handle a partition whose offset is out of range and return a new fetch offset